Repository: beam Updated Branches: refs/heads/master a1d82c203 -> b4bafd092
Fixes PubsubIO.Write translation in Dataflow runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/415d4bb1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/415d4bb1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/415d4bb1 Branch: refs/heads/master Commit: 415d4bb1b05040bc8aef45bcd217e670cc0c52b4 Parents: a1d82c2 Author: Eugene Kirpichov <[email protected]> Authored: Wed May 3 16:40:18 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Wed May 3 16:42:59 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 3 +++ 1 file changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/415d4bb1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 7da1755..0a4a151 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1026,6 +1026,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { stepContext.addInput( PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute()); } + stepContext.addInput( + PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, + byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn()))); // No coder is needed in this case since the collection being written is already of // PubsubMessage, however the Dataflow backend require a coder to be set. stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(VoidCoder.of()));
