This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c1fee04 Fix PubsubReader to populate message id correctly
new c18e3cf Merge pull request #14503 from [BEAM-12143] Fix PubsubReader
to populate message id correctly
c1fee04 is described below
commit c1fee0404a6c4fca4f86b0bedda00e5b4e2beeed
Author: Boyuan Zhang <[email protected]>
AuthorDate: Fri Apr 9 15:42:24 2021 -0700
Fix PubsubReader to populate message id correctly
---
.../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 19 +++++++++++++++++--
1 file changed, 17 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index bd5e868..c5b7f3f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -893,7 +893,9 @@ public class PubsubUnboundedSource extends
PTransform<PBegin, PCollection<Pubsub
throw new NoSuchElementException();
}
if (this.outer.outer.getNeedsMessageId() ||
this.outer.outer.getNeedsAttributes()) {
- return current.message().toByteArray();
+ com.google.pubsub.v1.PubsubMessage output =
+
current.message().toBuilder().setMessageId(current.recordId()).build();
+ return output.toByteArray();
}
return current.message().getData().toByteArray();
}
@@ -1340,6 +1342,18 @@ public class PubsubUnboundedSource extends
PTransform<PBegin, PCollection<Pubsub
} else {
function = new DeserializeBytesIntoPubsubMessagePayloadOnly();
}
+ Coder<PubsubMessage> messageCoder;
+ if (getNeedsMessageId()) {
+ messageCoder =
+ getNeedsAttributes()
+ ? PubsubMessageWithAttributesAndMessageIdCoder.of()
+ : PubsubMessageWithMessageIdCoder.of();
+ } else {
+ messageCoder =
+ getNeedsAttributes()
+ ? PubsubMessageWithAttributesCoder.of()
+ : PubsubMessagePayloadOnlyCoder.of();
+ }
PCollection<PubsubMessage> messages =
input
.getPipeline()
@@ -1347,7 +1361,8 @@ public class PubsubUnboundedSource extends
PTransform<PBegin, PCollection<Pubsub
.apply(Read.from(new PubsubSource(this)))
.apply(
"MapBytesToPubsubMessages",
-
MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function));
+
MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function))
+ .setCoder(messageCoder);
if (usesStatsFn(input.getPipeline().getOptions())) {
messages =
messages.apply(