Repository: beam Updated Branches: refs/heads/master 983a44926 -> b633abe2c
Remove Pipeline reference from TransformHierarchy This change removes a direct dependency cycle between Pipeline and TransformHierarchy. There is still an indirect cycle through PValues, but that is slightly less problematic. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e9fcebc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e9fcebc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e9fcebc Branch: refs/heads/master Commit: 5e9fcebc07725de368391914781e5b4d5f9c4a19 Parents: d7a4e49 Author: Kenneth Knowles <[email protected]> Authored: Fri May 19 12:57:41 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sat May 20 08:33:21 2017 -0700 ---------------------------------------------------------------------- .../translation/ApexPipelineTranslator.java | 4 +- .../apex/translation/TranslationContext.java | 5 +- .../core/construction/SdkComponents.java | 14 +++--- .../core/construction/SdkComponentsTest.java | 7 +-- .../beam/runners/direct/DirectGraphVisitor.java | 3 +- .../direct/KeyedPValueTrackingVisitor.java | 5 +- .../flink/FlinkBatchPipelineTranslator.java | 2 +- .../apache/beam/runners/flink/FlinkRunner.java | 10 +--- .../flink/FlinkStreamingPipelineTranslator.java | 4 +- .../dataflow/DataflowPipelineTranslator.java | 6 +-- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../apache/beam/runners/spark/SparkRunner.java | 2 +- .../streaming/TrackStreamingSourcesTest.java | 14 +++++- .../main/java/org/apache/beam/sdk/Pipeline.java | 50 ++++++++++++++++---- .../beam/sdk/runners/TransformHierarchy.java | 6 +-- .../sdk/runners/TransformHierarchyTest.java | 2 +- 16 files changed, 82 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java index 32e470f..bda074b 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; * into Apex logical plan {@link DAG}. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { +public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class); /** @@ -110,7 +110,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { throw new UnsupportedOperationException( "no translator registered for " + transform); } - translationContext.setCurrentTransform(node); + translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline())); translator.translate(transform, translationContext); } http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index a5e3028..aff3863 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -36,7 +36,6 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.Window; @@ -77,8 +76,8 @@ class TranslationContext { this.pipelineOptions = pipelineOptions; } - public void setCurrentTransform(TransformHierarchy.Node treeNode) { - this.currentTransform = treeNode.toAppliedPTransform(); + public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) { + this.currentTransform = transform; } public ApexPipelineOptions getPipelineOptions() { http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index eb29b9a..5714fc5 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -62,10 +62,10 @@ class SdkComponents { return new SdkComponents(); } - public static RunnerApi.Pipeline translatePipeline(Pipeline p) { + public static RunnerApi.Pipeline translatePipeline(Pipeline pipeline) { final SdkComponents components = create(); final Collection<String> rootIds = new HashSet<>(); - p.traverseTopologically( + pipeline.traverseTopologically( new PipelineVisitor.Defaults() { private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children = ArrayListMultimap.create(); @@ -77,9 +77,10 @@ class SdkComponents { rootIds.add(components.getExistingPTransformId(pipelineRoot)); } } else { - children.put(node.getEnclosingNode(), node.toAppliedPTransform()); + children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline())); try { - components.registerPTransform(node.toAppliedPTransform(), children.get(node)); + components.registerPTransform( + node.toAppliedPTransform(getPipeline()), children.get(node)); } catch (IOException e) { throw new RuntimeException(e); } @@ -88,10 +89,11 @@ class SdkComponents { @Override public void visitPrimitiveTransform(Node node) { - children.put(node.getEnclosingNode(), node.toAppliedPTransform()); + children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline())); try { components.registerPTransform( - node.toAppliedPTransform(), Collections.<AppliedPTransform<?, ?, ?>>emptyList()); + node.toAppliedPTransform(getPipeline()), + Collections.<AppliedPTransform<?, ?, ?>>emptyList()); } catch (IOException e) { throw new IllegalStateException(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index 7424886..55702ea 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -97,18 +97,13 @@ public class SdkComponentsTest { final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline); pipeline.traverseTopologically( - new PipelineVisitor() { + new PipelineVisitor.Defaults() { Set<Node> transforms = new HashSet<>(); Set<PCollection<?>> pcollections = new HashSet<>(); Set<Equivalence.Wrapper<? extends Coder<?>>> coders = new HashSet<>(); Set<WindowingStrategy<?, ?>> windowingStrategies = new HashSet<>(); @Override - public CompositeBehavior enterCompositeTransform(Node node) { - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override public void leaveCompositeTransform(Node node) { if (node.isRootNode()) { assertThat( http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index 1ee8ceb..01204e3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PValue; * input after the upstream transform has produced and committed output. */ class DirectGraphVisitor extends PipelineVisitor.Defaults { + private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>(); private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers = @@ -101,7 +102,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) { @SuppressWarnings({"rawtypes", "unchecked"}) - AppliedPTransform<?, ?, ?> application = node.toAppliedPTransform(); + AppliedPTransform<?, ?, ?> application = node.toAppliedPTransform(getPipeline()); return application; } http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 347f313..f9b2dae 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -44,7 +44,7 @@ import org.apache.beam.sdk.values.TupleTag; */ // TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms // unkeyed -class KeyedPValueTrackingVisitor implements PipelineVisitor { +class KeyedPValueTrackingVisitor extends PipelineVisitor.Defaults { private static final Set<Class<? extends PTransform>> PRODUCES_KEYED_OUTPUTS = ImmutableSet.of( @@ -91,9 +91,6 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { } @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) {} - - @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { boolean inputsAreKeyed = true; for (PValue input : producer.getInputs().values()) { http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java index 854b674..50910b5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java @@ -112,7 +112,7 @@ class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; // create the applied PTransform on the batchContext - batchContext.setCurrentTransform(node.toAppliedPTransform()); + batchContext.setCurrentTransform(node.toAppliedPTransform(getPipeline())); typedTranslator.translateNode(typedTransform, batchContext); } http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 80ef7bb..ca12615 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.PValue; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.client.program.DetachedEnvironment; import org.slf4j.Logger; @@ -199,10 +198,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> { // have just recorded the full names during apply time. if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>(); - pipeline.traverseTopologically(new Pipeline.PipelineVisitor() { - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) { - } + pipeline.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { @@ -218,10 +214,6 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> { } return CompositeBehavior.ENTER_TRANSFORM; } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { - } }); LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} " http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 53a1fa1..8da68c5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -188,7 +188,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; // create the applied PTransform on the streamingContext - streamingContext.setCurrentTransform(node.toAppliedPTransform()); + streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline())); typedTranslator.translateNode(typedTransform, streamingContext); } @@ -203,7 +203,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { @SuppressWarnings("unchecked") StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; - streamingContext.setCurrentTransform(node.toAppliedPTransform()); + streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline())); return typedTranslator.canTranslate(typedTransform, streamingContext); } http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 840bda8..6d7a0f8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -431,18 +431,18 @@ public class DataflowPipelineTranslator { transform, node.getFullName()); LOG.debug("Translating {}", transform); - currentTransform = node.toAppliedPTransform(); + currentTransform = node.toAppliedPTransform(getPipeline()); translator.translate(transform, this); currentTransform = null; } @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { - producers.put(value, producer.toAppliedPTransform()); + producers.put(value, producer.toAppliedPTransform(getPipeline())); LOG.debug("Checking translation of {}", value); if (!producer.isCompositeNode()) { // Primitive transforms are the only ones assigned step names. - asOutputReference(value, producer.toAppliedPTransform()); + asOutputReference(value, producer.toAppliedPTransform(getPipeline())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 2ef8737..cce6ce7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -729,7 +729,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>(); pipeline.traverseTopologically( - new PipelineVisitor() { + new PipelineVisitor.Defaults() { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) {} http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 8c02f0f..9e2426e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -404,7 +404,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass); LOG.info("Evaluating {}", transform); - AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(); + AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(getPipeline()); ctxt.setCurrentTransform(appliedTransform); evaluator.evaluate(transform, ctxt); ctxt.setCurrentTransform(null); http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java index 33a636a..e8a5951 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java @@ -148,6 +148,12 @@ public class TrackStreamingSourcesTest { } @Override + public void enterPipeline(Pipeline p) { + super.enterPipeline(p); + evaluator.enterPipeline(p); + } + + @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { return evaluator.enterCompositeTransform(node); } @@ -156,7 +162,7 @@ public class TrackStreamingSourcesTest { public void visitPrimitiveTransform(TransformHierarchy.Node node) { PTransform transform = node.getTransform(); if (transform.getClass() == transformClassToAssert) { - AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(); + AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(getPipeline()); ctxt.setCurrentTransform(appliedTransform); //noinspection unchecked Dataset dataset = ctxt.borrowDataset((PTransform<? extends PValue, ?>) transform); @@ -166,6 +172,12 @@ public class TrackStreamingSourcesTest { evaluator.visitPrimitiveTransform(node); } } + + @Override + public void leavePipeline(Pipeline p) { + super.leavePipeline(p); + evaluator.leavePipeline(p); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 83496a5..bdf8a12 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; @@ -205,7 +206,7 @@ public class Pipeline { public CompositeBehavior enterCompositeTransform(Node node) { if (!node.isRootNode()) { for (PTransformOverride override : overrides) { - if (override.getMatcher().matches(node.toAppliedPTransform())) { + if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) { matched.put(node, override); } } @@ -227,7 +228,7 @@ public class Pipeline { @Override public void visitPrimitiveTransform(Node node) { for (PTransformOverride override : overrides) { - if (override.getMatcher().matches(node.toAppliedPTransform())) { + if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) { matched.put(node, override); } } @@ -238,7 +239,7 @@ public class Pipeline { private void replace(final PTransformOverride override) { final Set<Node> matches = new HashSet<>(); final Set<Node> freedNodes = new HashSet<>(); - transforms.visit( + traverseTopologically( new PipelineVisitor.Defaults() { @Override public CompositeBehavior enterCompositeTransform(Node node) { @@ -247,7 +248,8 @@ public class Pipeline { freedNodes.add(node); return CompositeBehavior.ENTER_TRANSFORM; } - if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform())) { + if (!node.isRootNode() + && override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) { matches.add(node); // This node will be freed. When we visit any of its children, they will also be freed freedNodes.add(node); @@ -259,7 +261,7 @@ public class Pipeline { public void visitPrimitiveTransform(Node node) { if (freedNodes.contains(node.getEnclosingNode())) { freedNodes.add(node); - } else if (override.getMatcher().matches(node.toAppliedPTransform())) { + } else if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) { matches.add(node); freedNodes.add(node); } @@ -334,8 +336,14 @@ public class Pipeline { @Internal public interface PipelineVisitor { /** - * Called for each composite transform after all topological predecessors have been visited - * but before any of its component transforms. + * Called before visiting anything values or transforms, as many uses of a visitor require + * access to the {@link Pipeline} object itself. + */ + void enterPipeline(Pipeline p); + + /** + * Called for each composite transform after all topological predecessors have been visited but + * before any of its component transforms. * * <p>The return value controls whether or not child transforms are visited. */ @@ -360,6 +368,11 @@ public class Pipeline { void visitValue(PValue value, TransformHierarchy.Node producer); /** + * Called when all values and transforms in a {@link Pipeline} have been visited. + */ + void leavePipeline(Pipeline pipeline); + + /** * Control enum for indicating whether or not a traversal should process the contents of * a composite transform or not. */ @@ -373,6 +386,18 @@ public class Pipeline { * User implementations can override just those methods they are interested in. */ class Defaults implements PipelineVisitor { + + private Pipeline pipeline; + + protected Pipeline getPipeline() { + return pipeline; + } + + @Override + public void enterPipeline(Pipeline pipeline) { + this.pipeline = checkNotNull(pipeline); + } + @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { return CompositeBehavior.ENTER_TRANSFORM; @@ -386,6 +411,11 @@ public class Pipeline { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { } + + @Override + public void leavePipeline(Pipeline pipeline) { + this.pipeline = null; + } } } @@ -406,7 +436,9 @@ public class Pipeline { */ @Internal public void traverseTopologically(PipelineVisitor visitor) { + visitor.enterPipeline(this); transforms.visit(visitor); + visitor.leavePipeline(this); } /** @@ -444,7 +476,7 @@ public class Pipeline { ///////////////////////////////////////////////////////////////////////////// // Below here are internal operations, never called by users. - private final TransformHierarchy transforms = new TransformHierarchy(this); + private final TransformHierarchy transforms = new TransformHierarchy(); private Set<String> usedFullNames = new HashSet<>(); private CoderRegistry coderRegistry; private final List<String> unstableNames = new ArrayList<>(); @@ -495,7 +527,7 @@ public class Pipeline { PTransformOverrideFactory<InputT, OutputT, TransformT> replacementFactory) { PTransformReplacement<InputT, OutputT> replacement = replacementFactory.getReplacementTransform( - (AppliedPTransform<InputT, OutputT, TransformT>) original.toAppliedPTransform()); + (AppliedPTransform<InputT, OutputT, TransformT>) original.toAppliedPTransform(this)); if (replacement.getTransform() == original.getTransform()) { return; } http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index fac558b..2f0e8ef 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory; public class TransformHierarchy { private static final Logger LOG = LoggerFactory.getLogger(TransformHierarchy.class); - private final Pipeline pipeline; private final Node root; private final Map<Node, PInput> unexpandedInputs; private final Map<POutput, Node> producers; @@ -65,8 +64,7 @@ public class TransformHierarchy { // Maintain a stack based on the enclosing nodes private Node current; - public TransformHierarchy(Pipeline pipeline) { - this.pipeline = pipeline; + public TransformHierarchy() { producers = new HashMap<>(); producerInput = new HashMap<>(); unexpandedInputs = new HashMap<>(); @@ -453,7 +451,7 @@ public class TransformHierarchy { /** * Returns the {@link AppliedPTransform} representing this {@link Node}. */ - public AppliedPTransform<?, ?, ?> toAppliedPTransform() { + public AppliedPTransform<?, ?, ?> toAppliedPTransform(Pipeline pipeline) { return AppliedPTransform.of( getFullName(), inputs, outputs, (PTransform) getTransform(), pipeline); } http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 125e159..1197d1b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -79,7 +79,7 @@ public class TransformHierarchyTest implements Serializable { @Before public void setup() { - hierarchy = new TransformHierarchy(pipeline); + hierarchy = new TransformHierarchy(); } @Test
