This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b6d0abb  [BEAM-10863] Change encoding of Pubsub sink to global window.
     new 8bae9b3  Merge pull request #12791 from [BEAM-10863] Change encoding 
of Pubsub sink to global window.
b6d0abb is described below

commit b6d0abbe1fa2d7d19f62419a528feeee9558e9ff
Author: Boyuan Zhang <[email protected]>
AuthorDate: Tue Sep 8 17:51:56 2020 -0700

    [BEAM-10863] Change encoding of Pubsub sink to global window.
---
 .../main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e64e20f..4b1af69 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1460,9 +1460,10 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       stepContext.addInput(
           PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
           byteArrayToJsonString(serializeToByteArray(new 
IdentityMessageFn())));
-      // No coder is needed in this case since the collection being written is 
already of
-      // PubsubMessage, however the Dataflow backend require a coder to be set.
-      
stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(VoidCoder.of()));
+
+      // Using a GlobalWindowCoder as a place holder because GlobalWindowCoder 
is known coder.
+      stepContext.addEncodingInput(
+          WindowedValue.getFullCoder(VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE));
       stepContext.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
     }
   }

Reply via email to