Repository: beam Updated Branches: refs/heads/master ce3dd4583 -> 32d55323c
Check for Deferral on Non-additional inputs Because Side Inputs are represented within the expanded inputs, the check that the transform is a Combine with Side Inputs would never be hit. This ensures that we do not consider additional inputs during the check to defer evaluation of the node. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ac18b2e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ac18b2e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ac18b2e Branch: refs/heads/master Commit: 1ac18b2eb1371422e60d50a8c3f37b3b24d59611 Parents: ce3dd45 Author: Thomas Groh <[email protected]> Authored: Mon Jun 12 16:55:59 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon Jun 12 16:55:59 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/beam/runners/spark/SparkRunner.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1ac18b2e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 9e2426e..d008718 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.metrics.AggregatorMetricSource; @@ -359,10 +360,12 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { protected boolean shouldDefer(TransformHierarchy.Node node) { // if the input is not a PCollection, or it is but with non merging windows, don't defer. - if (node.getInputs().size() != 1) { + Collection<PValue> nonAdditionalInputs = + TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline())); + if (nonAdditionalInputs.size() != 1) { return false; } - PValue input = Iterables.getOnlyElement(node.getInputs().values()); + PValue input = Iterables.getOnlyElement(nonAdditionalInputs); if (!(input instanceof PCollection) || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) { return false;
