Repository: incubator-beam Updated Branches: refs/heads/master 4c9058236 -> a9a41eb94
[BEAM-794] Differ combining in case of merging windows with sideInputs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a7cc8206 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a7cc8206 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a7cc8206 Branch: refs/heads/master Commit: a7cc8206cbbc6ac10e71a0563da2fea4c708277b Parents: 4c90582 Author: Sela <[email protected]> Authored: Fri Oct 21 16:00:57 2016 +0300 Committer: Sela <[email protected]> Committed: Sat Oct 22 12:23:33 2016 +0300 ---------------------------------------------------------------------- .../apache/beam/runners/spark/SparkRunner.java | 33 +++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a7cc8206/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index cad53be..b17c38c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark; import java.util.Collection; +import java.util.List; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkContextFactory; @@ -34,11 +35,13 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; @@ -206,7 +209,7 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { @SuppressWarnings("unchecked") Class<PTransform<?, ?>> transformClass = (Class<PTransform<?, ?>>) node.getTransform().getClass(); - if (translator.hasTranslation(transformClass)) { + if (translator.hasTranslation(transformClass) && !shouldDefer(node)) { LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName()); LOG.debug("Composite transform class: '{}'", transformClass); doVisitTransform(node); @@ -216,6 +219,34 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { return CompositeBehavior.ENTER_TRANSFORM; } + private boolean shouldDefer(TransformTreeNode node) { + PInput input = node.getInput(); + // if the input is not a PCollection, or it is but with non merging windows, don't defer. + if (!(input instanceof PCollection) + || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) { + return false; + } + // so far we know that the input is a PCollection with merging windows. + // check for sideInput in case of a Combine transform. + PTransform<?, ?> transform = node.getTransform(); + boolean hasSideInput = false; + if (transform instanceof Combine.PerKey) { + List<PCollectionView<?>> sideInputs = ((Combine.PerKey<?, ?, ?>) transform).getSideInputs(); + hasSideInput = sideInputs != null && !sideInputs.isEmpty(); + } else if (transform instanceof Combine.Globally) { + List<PCollectionView<?>> sideInputs = ((Combine.Globally<?, ?>) transform).getSideInputs(); + hasSideInput = sideInputs != null && !sideInputs.isEmpty(); + } + // defer if sideInputs are defined. + if (hasSideInput) { + LOG.info("Deferring combine transformation {} for job {}", transform, + ctxt.getPipeline().getOptions().getJobName()); + return true; + } + // default. + return false; + } + @Override public void visitPrimitiveTransform(TransformTreeNode node) { doVisitTransform(node);
