Reduce Prevalence of PValue in the DirectRunner Use PCollection or PCollectionView explicitly.
Retrieve views from the WriteView transform rather than visiting the view as an output PValue. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a78bd3f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a78bd3f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a78bd3f Branch: refs/heads/master Commit: 6a78bd3f09c99f49c5f27d15b3791f200bf5d53d Parents: dc70383 Author: Thomas Groh <[email protected]> Authored: Wed May 31 09:38:02 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed May 31 10:38:45 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/direct/DirectGraph.java | 26 +++++++++++--------- .../beam/runners/direct/DirectGraphVisitor.java | 25 ++++++++++--------- .../beam/runners/direct/DirectRunner.java | 4 ++- .../beam/runners/direct/EvaluationContext.java | 17 ++++++++++++- .../beam/runners/direct/WatermarkManager.java | 19 +++++++++++--- .../runners/direct/DirectGraphVisitorTest.java | 3 +++ .../beam/runners/direct/DirectGraphs.java | 10 +++++++- .../runners/direct/EvaluationContextTest.java | 6 ++++- 8 files changed, 80 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java index 83b214a..c2c0afa 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java @@ -24,9 +24,9 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; /** @@ -34,39 +34,43 @@ import org.apache.beam.sdk.values.PValue; * executed with the {@link DirectRunner}. */ class DirectGraph { - private final Map<POutput, AppliedPTransform<?, ?, ?>> producers; + private final Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers; + private final Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters; private final ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers; - private final Set<PCollectionView<?>> views; private final Set<AppliedPTransform<?, ?, ?>> rootTransforms; private final Map<AppliedPTransform<?, ?, ?>, String> stepNames; public static DirectGraph create( - Map<POutput, AppliedPTransform<?, ?, ?>> producers, + Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers, + Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters, ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers, - Set<PCollectionView<?>> views, Set<AppliedPTransform<?, ?, ?>> rootTransforms, Map<AppliedPTransform<?, ?, ?>, String> stepNames) { - return new DirectGraph(producers, primitiveConsumers, views, rootTransforms, stepNames); + return new DirectGraph(producers, viewWriters, primitiveConsumers, rootTransforms, stepNames); } private DirectGraph( - Map<POutput, AppliedPTransform<?, ?, ?>> producers, + Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers, + Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters, ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers, - Set<PCollectionView<?>> views, Set<AppliedPTransform<?, ?, ?>> rootTransforms, Map<AppliedPTransform<?, ?, ?>, String> stepNames) { this.producers = producers; + this.viewWriters = viewWriters; this.primitiveConsumers = primitiveConsumers; - this.views = views; this.rootTransforms = rootTransforms; this.stepNames = stepNames; } - AppliedPTransform<?, ?, ?> getProducer(PValue produced) { + AppliedPTransform<?, ?, ?> getProducer(PCollection<?> produced) { return producers.get(produced); } + AppliedPTransform<?, ?, ?> getWriter(PCollectionView<?> view) { + return viewWriters.get(view); + } + List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) { return primitiveConsumers.get(consumed); } @@ -76,7 +80,7 @@ class DirectGraph { } Set<PCollectionView<?>> getViews() { - return views; + return viewWriters.keySet(); } String getStepName(AppliedPTransform<?, ?, ?> step) { http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index 01204e3..d54de5d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -30,9 +30,9 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; /** @@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PValue; */ class DirectGraphVisitor extends PipelineVisitor.Defaults { - private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>(); + private Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers = new HashMap<>(); + private Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters = new HashMap<>(); private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers = ArrayListMultimap.create(); - private Set<PCollectionView<?>> views = new HashSet<>(); private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>(); private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>(); private int numTransforms = 0; @@ -86,17 +86,19 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { for (PValue value : node.getInputs().values()) { primitiveConsumers.put(value, appliedTransform); } + if (node.getTransform() instanceof ViewOverrideFactory.WriteView) { + viewWriters.put( + ((ViewOverrideFactory.WriteView<?, ?>) node.getTransform()).getView(), + node.toAppliedPTransform(getPipeline())); + } } } @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer); - if (value instanceof PCollectionView) { - views.add((PCollectionView<?>) value); - } - if (!producers.containsKey(value)) { - producers.put(value, appliedTransform); + if (value instanceof PCollection && !producers.containsKey(value)) { + producers.put((PCollection<?>) value, appliedTransform); } } @@ -111,11 +113,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { } /** - * Get the graph constructed by this {@link DirectGraphVisitor}, which provides - * lookups for producers and consumers of {@link PValue PValues}. + * Get the graph constructed by this {@link DirectGraphVisitor}, which provides lookups for + * producers and consumers of {@link PValue PValues}. */ public DirectGraph getGraph() { checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed"); - return DirectGraph.create(producers, primitiveConsumers, views, rootTransforms, stepNames); + return DirectGraph.create( + producers, viewWriters, primitiveConsumers, rootTransforms, stepNames); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 69dea8f..dbd1ec4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -220,7 +221,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { * iteration order based on the order at which elements are added to it. */ @SuppressWarnings("rawtypes") - private List<PTransformOverride> defaultTransformOverrides() { + @VisibleForTesting + List<PTransformOverride> defaultTransformOverrides() { return ImmutableList.<PTransformOverride>builder() .add( PTransformOverride.of( http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 88ce85a..e215070 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -276,7 +276,7 @@ class EvaluationContext { * callback will be executed regardless of whether values have been produced. */ public void scheduleAfterOutputWouldBeProduced( - PValue value, + PCollection<?> value, BoundedWindow window, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) { @@ -287,6 +287,21 @@ class EvaluationContext { } /** + * Schedule a callback to be executed after output would be produced for the given window if there + * had been input. + */ + public void scheduleAfterOutputWouldBeProduced( + PCollectionView<?> view, + BoundedWindow window, + WindowingStrategy<?, ?> windowingStrategy, + Runnable runnable) { + AppliedPTransform<?, ?, ?> producing = graph.getWriter(view); + callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable); + + fireAvailableCallbacks(producing); + } + + /** * Schedule a callback to be executed after the given window is expired. * * <p>For example, upstream state associated with the window may be cleared. http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 4f1b831..40ce163 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -60,6 +60,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; @@ -790,6 +791,18 @@ class WatermarkManager { } } + private TransformWatermarks getValueWatermark(PValue pvalue) { + if (pvalue instanceof PCollection) { + return getTransformWatermark(graph.getProducer((PCollection<?>) pvalue)); + } else if (pvalue instanceof PCollectionView<?>) { + return getTransformWatermark(graph.getWriter((PCollectionView<?>) pvalue)); + } else { + throw new IllegalArgumentException( + String.format( + "Unknown type of %s %s", PValue.class.getSimpleName(), pvalue.getClass())); + } + } + private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) { TransformWatermarks wms = transformToWatermarks.get(transform); if (wms == null) { @@ -824,8 +837,7 @@ class WatermarkManager { } for (PValue pvalue : inputs.values()) { Watermark producerOutputWatermark = - getTransformWatermark(graph.getProducer(pvalue)) - .synchronizedProcessingOutputWatermark; + getValueWatermark(pvalue).synchronizedProcessingOutputWatermark; inputWmsBuilder.add(producerOutputWatermark); } return inputWmsBuilder.build(); @@ -838,8 +850,7 @@ class WatermarkManager { inputWatermarksBuilder.add(THE_END_OF_TIME); } for (PValue pvalue : inputs.values()) { - Watermark producerOutputWatermark = - getTransformWatermark(graph.getProducer(pvalue)).outputWatermark; + Watermark producerOutputWatermark = getValueWatermark(pvalue).outputWatermark; inputWatermarksBuilder.add(producerOutputWatermark); } List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build(); http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java index 7f46a0e..576edf3 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java @@ -78,6 +78,9 @@ public class DirectGraphVisitorTest implements Serializable { .apply(View.<String>asList()); PCollectionView<Object> singletonView = p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton()); + p.replaceAll( + DirectRunner.fromOptions(TestPipeline.testingPipelineOptions()) + .defaultTransformOverrides()); p.traverseTopologically(visitor); assertThat( visitor.getGraph().getViews(), http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java index 2f048fa..43de091 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.direct; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; /** Test utilities for the {@link DirectRunner}. */ @@ -30,6 +32,12 @@ final class DirectGraphs { } public static AppliedPTransform<?, ?, ?> getProducer(PValue value) { - return getGraph(value.getPipeline()).getProducer(value); + if (value instanceof PCollection) { + return getGraph(value.getPipeline()).getProducer((PCollection<?>) value); + } else if (value instanceof PCollectionView) { + return getGraph(value.getPipeline()).getWriter((PCollectionView<?>) value); + } + throw new IllegalArgumentException( + String.format("Unexpected type of %s %s", PValue.class.getSimpleName(), value.getClass())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 80b04f8..c0e43d6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -101,6 +101,10 @@ public class EvaluationContextTest { view = created.apply(View.<Integer>asIterable()); unbounded = p.apply(GenerateSequence.from(0)); + p.replaceAll( + DirectRunner.fromOptions(TestPipeline.testingPipelineOptions()) + .defaultTransformOverrides()); + KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = KeyedPValueTrackingVisitor.create(); p.traverseTopologically(keyedPValueTrackingVisitor); @@ -116,7 +120,7 @@ public class EvaluationContextTest { createdProducer = graph.getProducer(created); downstreamProducer = graph.getProducer(downstream); - viewProducer = graph.getProducer(view); + viewProducer = graph.getWriter(view); unboundedProducer = graph.getProducer(unbounded); }
