Revert "Visit a Transform Hierarchy in Topological Order" This reverts commit bd1dfdf3c8e145a99bcacebd0c64dcf6580f3ffe.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ad6433e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ad6433e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ad6433e Branch: refs/heads/master Commit: 6ad6433ec0c02aec8656e9e3b27f6e0f974f8ece Parents: 247f9bc Author: Thomas Groh <[email protected]> Authored: Fri May 26 11:04:05 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri May 26 13:18:55 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/spark/SparkRunner.java | 13 --- .../spark/translation/BoundedDataset.java | 6 -- .../spark/translation/TransformTranslator.java | 1 - .../spark/translation/StorageLevelTest.java | 4 +- .../beam/sdk/runners/TransformHierarchy.java | 46 +---------- .../sdk/runners/TransformHierarchyTest.java | 86 -------------------- 6 files changed, 6 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 1be5e13..9e2426e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -21,10 +21,8 @@ package org.apache.beam.runners.spark; import com.google.common.collect.Iterables; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -51,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; @@ -339,8 +336,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { protected final EvaluationContext ctxt; protected final SparkPipelineTranslator translator; - private final Set<Node> shouldIgnoreChildren = new HashSet<>(); - public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) { this.translator = translator; this.ctxt = ctxt; @@ -356,7 +351,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName()); LOG.debug("Composite transform class: '{}'", transformClass); doVisitTransform(node); - shouldIgnoreChildren.add(node); return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; } } @@ -398,13 +392,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { - Node parent = node.getEnclosingNode(); - while (!parent.isRootNode()) { - if (shouldIgnoreChildren.contains(parent)) { - return; - } - parent = parent.getEnclosingNode(); - } doVisitTransform(node); } http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index a746634..652c753 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.spark.translation; import com.google.common.base.Function; -import com.google.common.base.Joiner; import com.google.common.collect.Iterables; import java.util.List; import javax.annotation.Nullable; @@ -98,13 +97,8 @@ public class BoundedDataset<T> implements Dataset { return windowedValues; } - int timesCached = 0; @Override public void cache(String storageLevel) { - System.out.printf( - "Persisting Dataset %s for RDD %s (id %s) at level %s. %s times before%n", - this, getRDD(), getRDD().toDebugString(), storageLevel, timesCached++); - System.out.println(Joiner.on("\n\t").join(new Throwable().getStackTrace())); // populate the rdd if needed getRDD().persist(StorageLevel.fromString(storageLevel)); } http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 6ca12c9..742ea83 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -407,7 +407,6 @@ public final class TransformTranslator { JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>( jsc.sc(), transform.getSource(), runtimeContext, stepName).toJavaRDD(); // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation. - System.out.println("Evaluating Bounded Read " + transform); context.putDataset(transform, new BoundedDataset<>(input.cache())); } http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java index 8bd6dae..8f2e681 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java @@ -58,12 +58,12 @@ public class StorageLevelTest { @Test public void test() throws Exception { - PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo")); + PCollection<String> pCollection = pipeline.apply(Create.of("foo")); // by default, the Spark runner doesn't cache the RDD if it accessed only one time. // So, to "force" the caching of the RDD, we have to call the RDD at least two time. // That's why we are using Count fn on the PCollection. - pCollection.apply("CountAll", Count.<String>globally()); + pCollection.apply(Count.<String>globally()); PCollection<String> output = pCollection.apply(new StorageLevelPTransform()); http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 630d24c..2f0e8ef 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -202,12 +202,10 @@ public class TransformHierarchy { return producers.get(produced); } - int traversed = 0; public Set<PValue> visit(PipelineVisitor visitor) { finishSpecifying(); Set<PValue> visitedValues = new HashSet<>(); - traversed++; - root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>()); + root.visit(visitor, visitedValues); return visitedValues; } @@ -464,22 +462,7 @@ public class TransformHierarchy { * <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for * composite transforms), then the output values. */ - private void visit( - PipelineVisitor visitor, - Set<PValue> visitedValues, - Set<Node> visitedNodes, - Set<Node> passedComposites) { - if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) { - getEnclosingNode().visit(visitor, visitedValues, visitedNodes, passedComposites); - } - if (!visitedNodes.add(this)) { - LOG.debug("Not revisiting previously visited node {}", this); - return; - } else if (childNodeOf(passedComposites)) { - LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this); - return; - } - + private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) { if (!finishedSpecifying) { finishSpecifying(); } @@ -487,31 +470,22 @@ public class TransformHierarchy { if (!isRootNode()) { // Visit inputs. for (PValue inputValue : inputs.values()) { - Node valueProducer = getProducer(inputValue); - if (!visitedNodes.contains(valueProducer)) { - valueProducer.visit(visitor, visitedValues, visitedNodes, passedComposites); - } if (visitedValues.add(inputValue)) { - LOG.debug("Visiting input value {}", inputValue); - visitor.visitValue(inputValue, valueProducer); + visitor.visitValue(inputValue, getProducer(inputValue)); } } } if (isCompositeNode()) { - LOG.debug("Visiting composite node {}", this); PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this); if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) { for (Node child : parts) { - child.visit(visitor, visitedValues, visitedNodes, passedComposites); + child.visit(visitor, visitedValues); } - } else { - passedComposites.add(this); } visitor.leaveCompositeTransform(this); } else { - LOG.debug("Visiting primitive node {}", this); visitor.visitPrimitiveTransform(this); } @@ -520,24 +494,12 @@ public class TransformHierarchy { // Visit outputs. for (PValue pValue : outputs.values()) { if (visitedValues.add(pValue)) { - LOG.debug("Visiting output value {}", pValue); visitor.visitValue(pValue, this); } } } } - private boolean childNodeOf(Set<Node> nodes) { - if (isRootNode()) { - return false; - } - Node parent = this.getEnclosingNode(); - while (!parent.isRootNode() && !nodes.contains(parent)) { - parent = parent.getEnclosingNode(); - } - return nodes.contains(parent); - } - /** * Finish specifying a transform. * http://git-wip-us.apache.org/repos/asf/beam/blob/6ad6433e/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 2fe2817..1197d1b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.runners; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; @@ -33,8 +32,6 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; @@ -495,87 +492,4 @@ public class TransformHierarchyTest implements Serializable { assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode)); assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output)); } - - @Test - public void visitIsTopologicallyOrdered() { - PCollection<String> one = - PCollection.<String>createPrimitiveOutputInternal( - pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED) - .setCoder(StringUtf8Coder.of()); - final PCollection<Integer> two = - PCollection.<Integer>createPrimitiveOutputInternal( - pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) - .setCoder(VarIntCoder.of()); - final PDone done = PDone.in(pipeline); - final TupleTag<String> oneTag = new TupleTag<String>() {}; - final TupleTag<Integer> twoTag = new TupleTag<Integer>() {}; - final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two); - - hierarchy.pushNode("consumes_both", one, new PTransform<PCollection<String>, PDone>() { - @Override - public PDone expand(PCollection<String> input) { - return done; - } - - @Override - public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two); - } - }); - hierarchy.setOutput(done); - hierarchy.popNode(); - - final PTransform<PBegin, PCollectionTuple> producer = - new PTransform<PBegin, PCollectionTuple>() { - @Override - public PCollectionTuple expand(PBegin input) { - return oneAndTwo; - } - }; - hierarchy.pushNode( - "encloses_producer", - PBegin.in(pipeline), - new PTransform<PBegin, PCollectionTuple>() { - @Override - public PCollectionTuple expand(PBegin input) { - return input.apply(producer); - } - }); - hierarchy.pushNode( - "creates_one_and_two", - PBegin.in(pipeline), producer); - hierarchy.setOutput(oneAndTwo); - hierarchy.popNode(); - hierarchy.setOutput(oneAndTwo); - hierarchy.popNode(); - - hierarchy.visit(new PipelineVisitor.Defaults() { - private final Set<Node> visitedNodes = new HashSet<>(); - private final Set<PValue> visitedValues = new HashSet<>(); - @Override - public CompositeBehavior enterCompositeTransform(Node node) { - for (PValue input : node.getInputs().values()) { - assertThat(visitedValues, hasItem(input)); - } - visitedNodes.add(node); - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void visitPrimitiveTransform(Node node) { - assertThat(visitedNodes, hasItem(node.getEnclosingNode())); - for (PValue input : node.getInputs().values()) { - assertThat(visitedValues, hasItem(input)); - } - visitedNodes.add(node); - } - - @Override - public void visitValue(PValue value, Node producer) { - assertThat(visitedNodes, hasItem(producer)); - assertThat(visitedValues, not(hasItem(value))); - visitedValues.add(value); - } - }); - } }
