Visit a Transform Hierarchy in Topological Order Always ensure that the producer of a value is visited before that value is visited for the first time. Visit a compoosite before visiting any of its child nodes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd1dfdf3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd1dfdf3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd1dfdf3 Branch: refs/heads/master Commit: bd1dfdf3c8e145a99bcacebd0c64dcf6580f3ffe Parents: 7568f02 Author: Thomas Groh <[email protected]> Authored: Tue May 23 13:29:51 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri May 26 07:50:37 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/spark/SparkRunner.java | 13 +++ .../spark/translation/BoundedDataset.java | 6 ++ .../spark/translation/TransformTranslator.java | 1 + .../spark/translation/StorageLevelTest.java | 4 +- .../beam/sdk/runners/TransformHierarchy.java | 46 ++++++++++- .../sdk/runners/TransformHierarchyTest.java | 86 ++++++++++++++++++++ 6 files changed, 150 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 9e2426e..1be5e13 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -21,8 +21,10 @@ package org.apache.beam.runners.spark; import com.google.common.collect.Iterables; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -49,6 +51,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; @@ -336,6 +339,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { protected final EvaluationContext ctxt; protected final SparkPipelineTranslator translator; + private final Set<Node> shouldIgnoreChildren = new HashSet<>(); + public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) { this.translator = translator; this.ctxt = ctxt; @@ -351,6 +356,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName()); LOG.debug("Composite transform class: '{}'", transformClass); doVisitTransform(node); + shouldIgnoreChildren.add(node); return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; } } @@ -392,6 +398,13 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { + Node parent = node.getEnclosingNode(); + while (!parent.isRootNode()) { + if (shouldIgnoreChildren.contains(parent)) { + return; + } + parent = parent.getEnclosingNode(); + } doVisitTransform(node); } http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index 652c753..a746634 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark.translation; import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.collect.Iterables; import java.util.List; import javax.annotation.Nullable; @@ -97,8 +98,13 @@ public class BoundedDataset<T> implements Dataset { return windowedValues; } + int timesCached = 0; @Override public void cache(String storageLevel) { + System.out.printf( + "Persisting Dataset %s for RDD %s (id %s) at level %s. %s times before%n", + this, getRDD(), getRDD().toDebugString(), storageLevel, timesCached++); + System.out.println(Joiner.on("\n\t").join(new Throwable().getStackTrace())); // populate the rdd if needed getRDD().persist(StorageLevel.fromString(storageLevel)); } http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 742ea83..6ca12c9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -407,6 +407,7 @@ public final class TransformTranslator { JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>( jsc.sc(), transform.getSource(), runtimeContext, stepName).toJavaRDD(); // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation. + System.out.println("Evaluating Bounded Read " + transform); context.putDataset(transform, new BoundedDataset<>(input.cache())); } http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java index 8f2e681..8bd6dae 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java @@ -58,12 +58,12 @@ public class StorageLevelTest { @Test public void test() throws Exception { - PCollection<String> pCollection = pipeline.apply(Create.of("foo")); + PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo")); // by default, the Spark runner doesn't cache the RDD if it accessed only one time. // So, to "force" the caching of the RDD, we have to call the RDD at least two time. // That's why we are using Count fn on the PCollection. - pCollection.apply(Count.<String>globally()); + pCollection.apply("CountAll", Count.<String>globally()); PCollection<String> output = pCollection.apply(new StorageLevelPTransform()); http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 2f0e8ef..630d24c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -202,10 +202,12 @@ public class TransformHierarchy { return producers.get(produced); } + int traversed = 0; public Set<PValue> visit(PipelineVisitor visitor) { finishSpecifying(); Set<PValue> visitedValues = new HashSet<>(); - root.visit(visitor, visitedValues); + traversed++; + root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>()); return visitedValues; } @@ -462,7 +464,22 @@ public class TransformHierarchy { * <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for * composite transforms), then the output values. */ - private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) { + private void visit( + PipelineVisitor visitor, + Set<PValue> visitedValues, + Set<Node> visitedNodes, + Set<Node> passedComposites) { + if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) { + getEnclosingNode().visit(visitor, visitedValues, visitedNodes, passedComposites); + } + if (!visitedNodes.add(this)) { + LOG.debug("Not revisiting previously visited node {}", this); + return; + } else if (childNodeOf(passedComposites)) { + LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this); + return; + } + if (!finishedSpecifying) { finishSpecifying(); } @@ -470,22 +487,31 @@ public class TransformHierarchy { if (!isRootNode()) { // Visit inputs. for (PValue inputValue : inputs.values()) { + Node valueProducer = getProducer(inputValue); + if (!visitedNodes.contains(valueProducer)) { + valueProducer.visit(visitor, visitedValues, visitedNodes, passedComposites); + } if (visitedValues.add(inputValue)) { - visitor.visitValue(inputValue, getProducer(inputValue)); + LOG.debug("Visiting input value {}", inputValue); + visitor.visitValue(inputValue, valueProducer); } } } if (isCompositeNode()) { + LOG.debug("Visiting composite node {}", this); PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this); if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) { for (Node child : parts) { - child.visit(visitor, visitedValues); + child.visit(visitor, visitedValues, visitedNodes, passedComposites); } + } else { + passedComposites.add(this); } visitor.leaveCompositeTransform(this); } else { + LOG.debug("Visiting primitive node {}", this); visitor.visitPrimitiveTransform(this); } @@ -494,12 +520,24 @@ public class TransformHierarchy { // Visit outputs. for (PValue pValue : outputs.values()) { if (visitedValues.add(pValue)) { + LOG.debug("Visiting output value {}", pValue); visitor.visitValue(pValue, this); } } } } + private boolean childNodeOf(Set<Node> nodes) { + if (isRootNode()) { + return false; + } + Node parent = this.getEnclosingNode(); + while (!parent.isRootNode() && !nodes.contains(parent)) { + parent = parent.getEnclosingNode(); + } + return nodes.contains(parent); + } + /** * Finish specifying a transform. * http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 1197d1b..2fe2817 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.runners; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; @@ -32,6 +33,8 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; @@ -492,4 +495,87 @@ public class TransformHierarchyTest implements Serializable { assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode)); assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output)); } + + @Test + public void visitIsTopologicallyOrdered() { + PCollection<String> one = + PCollection.<String>createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED) + .setCoder(StringUtf8Coder.of()); + final PCollection<Integer> two = + PCollection.<Integer>createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) + .setCoder(VarIntCoder.of()); + final PDone done = PDone.in(pipeline); + final TupleTag<String> oneTag = new TupleTag<String>() {}; + final TupleTag<Integer> twoTag = new TupleTag<Integer>() {}; + final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two); + + hierarchy.pushNode("consumes_both", one, new PTransform<PCollection<String>, PDone>() { + @Override + public PDone expand(PCollection<String> input) { + return done; + } + + @Override + public Map<TupleTag<?>, PValue> getAdditionalInputs() { + return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two); + } + }); + hierarchy.setOutput(done); + hierarchy.popNode(); + + final PTransform<PBegin, PCollectionTuple> producer = + new PTransform<PBegin, PCollectionTuple>() { + @Override + public PCollectionTuple expand(PBegin input) { + return oneAndTwo; + } + }; + hierarchy.pushNode( + "encloses_producer", + PBegin.in(pipeline), + new PTransform<PBegin, PCollectionTuple>() { + @Override + public PCollectionTuple expand(PBegin input) { + return input.apply(producer); + } + }); + hierarchy.pushNode( + "creates_one_and_two", + PBegin.in(pipeline), producer); + hierarchy.setOutput(oneAndTwo); + hierarchy.popNode(); + hierarchy.setOutput(oneAndTwo); + hierarchy.popNode(); + + hierarchy.visit(new PipelineVisitor.Defaults() { + private final Set<Node> visitedNodes = new HashSet<>(); + private final Set<PValue> visitedValues = new HashSet<>(); + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + for (PValue input : node.getInputs().values()) { + assertThat(visitedValues, hasItem(input)); + } + visitedNodes.add(node); + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void visitPrimitiveTransform(Node node) { + assertThat(visitedNodes, hasItem(node.getEnclosingNode())); + for (PValue input : node.getInputs().values()) { + assertThat(visitedValues, hasItem(input)); + } + visitedNodes.add(node); + } + + @Override + public void visitValue(PValue value, Node producer) { + assertThat(visitedNodes, hasItem(producer)); + assertThat(visitedValues, not(hasItem(value))); + visitedValues.add(value); + } + }); + } }
