chamikaramj commented on a change in pull request #14294:
URL: https://github.com/apache/beam/pull/14294#discussion_r598910775
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -1093,14 +1090,7 @@ public void processElement(ProcessContext c) throws
IOException, SizeLimitExceed
}
// NOTE: The record id is always null.
- output.add(
- OutgoingMessage.of(
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(payload))
- .putAllAttributes(attributes)
- .build(),
- c.timestamp().getMillis(),
- null));
+ output.add(OutgoingMessage.of(message, c.timestamp().getMillis(),
null));
Review comment:
Note that this only affects the bounded case (usually batch pipelines).
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
##########
@@ -31,6 +31,9 @@
@Override
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(
+ CoderProviders.forCoder(
+ TypeDescriptor.of(PubsubMessage.class),
Review comment:
I think this change unfortunately will break update feature for runners
that support it (for example Dataflow).
cc: @reuvenlax
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]