Repository: beam
Updated Branches:
  refs/heads/master a1d82c203 -> b4bafd092


Fixes PubsubIO.Write translation in Dataflow runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/415d4bb1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/415d4bb1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/415d4bb1

Branch: refs/heads/master
Commit: 415d4bb1b05040bc8aef45bcd217e670cc0c52b4
Parents: a1d82c2
Author: Eugene Kirpichov <[email protected]>
Authored: Wed May 3 16:40:18 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Wed May 3 16:42:59 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/dataflow/DataflowRunner.java     | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/415d4bb1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7da1755..0a4a151 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1026,6 +1026,9 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
         stepContext.addInput(
             PropertyNames.PUBSUB_ID_ATTRIBUTE, 
overriddenTransform.getIdAttribute());
       }
+      stepContext.addInput(
+          PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
+          byteArrayToJsonString(serializeToByteArray(new 
IdentityMessageFn())));
       // No coder is needed in this case since the collection being written is 
already of
       // PubsubMessage, however the Dataflow backend require a coder to be set.
       
stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(VoidCoder.of()));

Reply via email to