Repository: incubator-beam Updated Branches: refs/heads/master baf5e416d -> 676843e04
DataflowRunner: get PBegin from PInput Fixes an invalid cast that breaks some PCollectionList-related tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee716889 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee716889 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee716889 Branch: refs/heads/master Commit: ee71688924a15bcd211e950d5822b6750b20eeb8 Parents: baf5e41 Author: Daniel Halperin <[email protected]> Authored: Sat Aug 27 09:42:08 2016 -0700 Committer: GitHub <[email protected]> Committed: Sat Aug 27 09:42:08 2016 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/beam/runners/dataflow/DataflowRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee716889/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 0ce4b58..e5b6614 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -400,7 +400,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return windowed; } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) && ((PCollectionList<?>) input).size() == 0) { - return (OutputT) Pipeline.applyTransform((PBegin) input, Create.of()); + return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of()); } else if (overrides.containsKey(transform.getClass())) { // It is the responsibility of whoever constructs overrides to ensure this is type safe. @SuppressWarnings("unchecked")
