This closes #2286
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f7e73bb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f7e73bb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f7e73bb Branch: refs/heads/DSL_SQL Commit: 5f7e73bbacf7096eed44002a54910a560b195801 Parents: b2de3db 8f4fa43 Author: Aljoscha Krettek <[email protected]> Authored: Wed Jun 7 19:43:19 2017 +0200 Committer: Luke Cwik <[email protected]> Committed: Wed Jun 7 13:41:20 2017 -0700 ---------------------------------------------------------------------- .../translation/types/CoderTypeSerializer.java | 41 ++- .../streaming/io/UnboundedSourceWrapper.java | 2 + .../flink/streaming/TestCountingSource.java | 48 ++- .../streaming/UnboundedSourceWrapperTest.java | 309 +++++++++++-------- .../beam/runners/dataflow/DataflowRunner.java | 24 +- 5 files changed, 269 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5f7e73bb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --cc runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index cce6ce7,cce6ce7..ed29330 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@@ -428,12 -428,12 +428,15 @@@ public class DataflowRunner extends Pip public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform( AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) { PTransform<PInput, PCollection<T>> original = transform.getTransform(); ++ PCollection<T> output = ++ (PCollection) Iterables.getOnlyElement(transform.getOutputs().values()); return PTransformReplacement.of( transform.getPipeline().begin(), InstanceBuilder.ofType(replacement) .withArg(DataflowRunner.class, runner) .withArg( (Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original) ++ .withArg((Class<? super PCollection<T>>) output.getClass(), output) .build()); } @@@ -809,11 -809,11 +812,12 @@@ extends PTransform<PBegin, PCollection<PubsubMessage>> { private final PubsubUnboundedSource transform; -- /** -- * Builds an instance of this class from the overridden transform. -- */ ++ /** Builds an instance of this class from the overridden transform. */ ++ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() public StreamingPubsubIORead( -- DataflowRunner runner, PubsubUnboundedSource transform) { ++ DataflowRunner runner, ++ PubsubUnboundedSource transform, ++ PCollection<PubsubMessage> originalOutput) { this.transform = transform; } @@@ -992,11 -992,11 +996,11 @@@ private static class StreamingUnboundedRead<T> extends PTransform<PBegin, PCollection<T>> { private final UnboundedSource<T, ?> source; -- /** -- * Builds an instance of this class from the overridden transform. -- */ ++ /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() -- public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) { ++ public StreamingUnboundedRead(DataflowRunner runner, ++ Read.Unbounded<T> transform, ++ PCollection<T> originalOutput) { this.source = transform.getSource(); } @@@ -1111,7 -1111,7 +1115,9 @@@ /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() -- public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) { ++ public StreamingBoundedRead(DataflowRunner runner, ++ Read.Bounded<T> transform, ++ PCollection<T> originalOutput) { this.source = transform.getSource(); }
