This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fee04  Fix PubsubReader to populate message id correctly
     new c18e3cf  Merge pull request #14503 from [BEAM-12143] Fix PubsubReader 
to populate message id correctly
c1fee04 is described below

commit c1fee0404a6c4fca4f86b0bedda00e5b4e2beeed
Author: Boyuan Zhang <[email protected]>
AuthorDate: Fri Apr 9 15:42:24 2021 -0700

    Fix PubsubReader to populate message id correctly
---
 .../beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index bd5e868..c5b7f3f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -893,7 +893,9 @@ public class PubsubUnboundedSource extends 
PTransform<PBegin, PCollection<Pubsub
         throw new NoSuchElementException();
       }
       if (this.outer.outer.getNeedsMessageId() || 
this.outer.outer.getNeedsAttributes()) {
-        return current.message().toByteArray();
+        com.google.pubsub.v1.PubsubMessage output =
+            
current.message().toBuilder().setMessageId(current.recordId()).build();
+        return output.toByteArray();
       }
       return current.message().getData().toByteArray();
     }
@@ -1340,6 +1342,18 @@ public class PubsubUnboundedSource extends 
PTransform<PBegin, PCollection<Pubsub
     } else {
       function = new DeserializeBytesIntoPubsubMessagePayloadOnly();
     }
+    Coder<PubsubMessage> messageCoder;
+    if (getNeedsMessageId()) {
+      messageCoder =
+          getNeedsAttributes()
+              ? PubsubMessageWithAttributesAndMessageIdCoder.of()
+              : PubsubMessageWithMessageIdCoder.of();
+    } else {
+      messageCoder =
+          getNeedsAttributes()
+              ? PubsubMessageWithAttributesCoder.of()
+              : PubsubMessagePayloadOnlyCoder.of();
+    }
     PCollection<PubsubMessage> messages =
         input
             .getPipeline()
@@ -1347,7 +1361,8 @@ public class PubsubUnboundedSource extends 
PTransform<PBegin, PCollection<Pubsub
             .apply(Read.from(new PubsubSource(this)))
             .apply(
                 "MapBytesToPubsubMessages",
-                
MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function));
+                
MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function))
+            .setCoder(messageCoder);
     if (usesStatsFn(input.getPipeline().getOptions())) {
       messages =
           messages.apply(

Reply via email to