Repository: beam
Updated Branches:
  refs/heads/master ce3dd4583 -> 32d55323c


Check for Deferral on Non-additional inputs

Because Side Inputs are represented within the expanded inputs, the
check that the transform is a Combine with Side Inputs would never be
hit. This ensures that we do not consider additional inputs during the
check to defer evaluation of the node.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ac18b2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ac18b2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ac18b2e

Branch: refs/heads/master
Commit: 1ac18b2eb1371422e60d50a8c3f37b3b24d59611
Parents: ce3dd45
Author: Thomas Groh <[email protected]>
Authored: Mon Jun 12 16:55:59 2017 -0700
Committer: Thomas Groh <[email protected]>
Committed: Mon Jun 12 16:55:59 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/spark/SparkRunner.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1ac18b2e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 9e2426e..d008718 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
@@ -359,10 +360,12 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
 
     protected boolean shouldDefer(TransformHierarchy.Node node) {
       // if the input is not a PCollection, or it is but with non merging 
windows, don't defer.
-      if (node.getInputs().size() != 1) {
+      Collection<PValue> nonAdditionalInputs =
+          
TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
+      if (nonAdditionalInputs.size() != 1) {
         return false;
       }
-      PValue input = Iterables.getOnlyElement(node.getInputs().values());
+      PValue input = Iterables.getOnlyElement(nonAdditionalInputs);
       if (!(input instanceof PCollection)
           || ((PCollection) 
input).getWindowingStrategy().getWindowFn().isNonMerging()) {
         return false;

Reply via email to