Provide methods for validating Surgery completion This takes a list, applies all overrides in the order of the list, and validates that no more overrides are applicable.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85af8981 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85af8981 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85af8981 Branch: refs/heads/master Commit: 85af898132ec5c528a96d5c213c08cee91fa6538 Parents: d6f6351 Author: Thomas Groh <[email protected]> Authored: Fri Mar 17 16:17:20 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Wed Mar 22 18:11:54 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 3 +- .../beam/runners/direct/DirectRunner.java | 3 +- .../flink/FlinkStreamingPipelineTranslator.java | 3 +- .../beam/runners/dataflow/DataflowRunner.java | 3 +- .../beam/runners/spark/TestSparkRunner.java | 6 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 66 ++++++++- .../beam/sdk/runners/PTransformOverride.java | 44 ++++++ .../java/org/apache/beam/sdk/PipelineTest.java | 138 +++++++++++++++++++ 8 files changed, 255 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 010ede3..79a2dd7 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Combine; @@ -114,7 +115,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { public ApexRunnerResult run(final Pipeline pipeline) { for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : getOverrides().entrySet()) { - pipeline.replace(override.getKey(), override.getValue()); + pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue())); } final ApexPipelineTranslator translator = new ApexPipelineTranslator(options); http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 4992c6a..94f0521 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; @@ -261,7 +262,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { public DirectPipelineResult run(Pipeline pipeline) { for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : defaultTransformOverrides().entrySet()) { - pipeline.replace(override.getKey(), override.getValue()); + pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue())); } MetricsEnvironment.setMetricsSupported(true); DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 9ab1310..d50d6bf 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -24,6 +24,7 @@ import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactor import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Combine; @@ -95,7 +96,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : transformOverrides.entrySet()) { - pipeline.replace(override.getKey(), override.getValue()); + pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue())); } super.translate(pipeline); } http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9398d72..718a1e3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -93,6 +93,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -666,7 +667,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @VisibleForTesting void replaceTransforms(Pipeline pipeline) { for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : overrides.entrySet()) { - pipeline.replace(override.getKey(), override.getValue()); + pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index e436422..e40534f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.PAssert; @@ -207,8 +208,9 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { @VisibleForTesting void adaptBoundedReads(Pipeline pipeline) { pipeline.replace( - PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class), - new AdaptedBoundedAsUnbounded.Factory()); + PTransformOverride.of( + PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class), + new AdaptedBoundedAsUnbounded.Factory())); } private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 2f368b1..f980a0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -19,9 +19,12 @@ package org.apache.beam.sdk; import static com.google.common.base.Preconditions.checkState; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.sdk.coders.CoderRegistry; @@ -29,7 +32,7 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.runners.PipelineRunner; @@ -172,14 +175,67 @@ public class Pipeline { return begin().apply(name, root); } + /** + * Replaces all nodes that match a {@link PTransformOverride} in this pipeline. Overrides are + * applied in the order they are present within the list. + * + * <p>After all nodes are replaced, ensures that no nodes in the updated graph match any of the + * overrides. + */ + public void replaceAll(List<PTransformOverride> overrides) { + for (PTransformOverride override : overrides) { + replace(override); + } + checkNoMoreMatches(overrides); + } + + private void checkNoMoreMatches(final List<PTransformOverride> overrides) { + traverseTopologically( + new PipelineVisitor.Defaults() { + SetMultimap<Node, PTransformOverride> matched = HashMultimap.create(); + + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + if (!node.isRootNode()) { + for (PTransformOverride override : overrides) { + if (override.getMatcher().matches(node.toAppliedPTransform())) { + matched.put(node, override); + } + } + } + if (!matched.containsKey(node)) { + return CompositeBehavior.ENTER_TRANSFORM; + } + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(Node node) { + if (node.isRootNode()) { + checkState( + matched.isEmpty(), "Found nodes that matched overrides. Matches: %s", matched); + } + } + + @Override + public void visitPrimitiveTransform(Node node) { + for (PTransformOverride override : overrides) { + if (override.getMatcher().matches(node.toAppliedPTransform())) { + matched.put(node, override); + } + } + } + }); + } + public void replace( - final PTransformMatcher matcher, PTransformOverrideFactory replacementFactory) { + final PTransformOverride override) { final Collection<Node> matches = new ArrayList<>(); transforms.visit( new PipelineVisitor.Defaults() { @Override public CompositeBehavior enterCompositeTransform(Node node) { - if (!node.isRootNode() && matcher.matches(node.toAppliedPTransform())) { + if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform())) { matches.add(node); // This node will be replaced. It should not be visited. return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; @@ -189,13 +245,13 @@ public class Pipeline { @Override public void visitPrimitiveTransform(Node node) { - if (matcher.matches(node.toAppliedPTransform())) { + if (override.getMatcher().matches(node.toAppliedPTransform())) { matches.add(node); } } }); for (Node match : matches) { - applyReplacement(match, replacementFactory); + applyReplacement(match, override.getOverrideFactory()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java new file mode 100644 index 0000000..33b9114 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.runners; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.transforms.PTransform; + +/** + * A {@link PTransformMatcher} and associated {@link PTransformOverrideFactory} to replace all + * matching {@link PTransform PTransforms}. + */ +@AutoValue +public abstract class PTransformOverride { + public static PTransformOverride of( + PTransformMatcher matcher, PTransformOverrideFactory factory) { + return new AutoValue_PTransformOverride(matcher, factory); + } + + /** + * Gets the {@link PTransformMatcher} to identify {@link PTransform PTransforms} to replace. + */ + public abstract PTransformMatcher getMatcher(); + + /** + * Gets the {@link PTransformOverrideFactory} of this override. + */ + public abstract PTransformOverrideFactory getOverrideFactory(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index d8e4ef4..7e5cc35 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -17,37 +17,56 @@ */ package org.apache.beam.sdk; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.util.Collections; +import java.util.List; +import java.util.Map; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.CountingInput.BoundedCountingInput; +import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -290,4 +309,123 @@ public class PipelineTest { public void testEmptyPipeline() throws Exception { pipeline.run(); } + + @Test + public void testReplaceAll() { + pipeline.enableAbandonedNodeEnforcement(false); + pipeline.apply(CountingInput.unbounded()); + pipeline.apply(CountingInput.upTo(100L)); + + pipeline.replaceAll( + ImmutableList.of( + PTransformOverride.of( + new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + return application.getTransform() instanceof UnboundedCountingInput; + } + }, + new UnboundedCountingInputOverride()), + PTransformOverride.of( + new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + return application.getTransform() instanceof BoundedCountingInput; + } + }, + new BoundedCountingInputOverride()))); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + if (!node.isRootNode()) { + assertThat( + node.getTransform().getClass(), + not( + anyOf( + Matchers.<Class<? extends PTransform>>equalTo( + UnboundedCountingInput.class), + Matchers.<Class<? extends PTransform>>equalTo( + BoundedCountingInput.class)))); + } + return CompositeBehavior.ENTER_TRANSFORM; + } + }); + } + + /** + * Tests that {@link Pipeline#replaceAll(List)} throws when one of the PTransformOverride still + * matches. + */ + @Test + public void testReplaceAllIncomplete() { + pipeline.enableAbandonedNodeEnforcement(false); + pipeline.apply(CountingInput.unbounded()); + + // The order is such that the output of the second will match the first, which is not permitted + thrown.expect(IllegalStateException.class); + pipeline.replaceAll( + ImmutableList.of( + PTransformOverride.of( + new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + return application.getTransform() instanceof BoundedCountingInput; + } + }, + new BoundedCountingInputOverride()), + PTransformOverride.of( + new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + return application.getTransform() instanceof UnboundedCountingInput; + } + }, + new UnboundedCountingInputOverride()))); + } + + static class BoundedCountingInputOverride + implements PTransformOverrideFactory<PBegin, PCollection<Long>, BoundedCountingInput> { + @Override + public PTransform<PBegin, PCollection<Long>> getReplacementTransform( + BoundedCountingInput transform) { + return Create.of(0L); + } + + @Override + public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) { + return p.begin(); + } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PCollection<Long> newOutput) { + return Collections.<PValue, ReplacementOutput>singletonMap( + newOutput, + ReplacementOutput.of( + Iterables.getOnlyElement(outputs), Iterables.getOnlyElement(newOutput.expand()))); + } + } + static class UnboundedCountingInputOverride + implements PTransformOverrideFactory<PBegin, PCollection<Long>, UnboundedCountingInput> { + @Override + public PTransform<PBegin, PCollection<Long>> getReplacementTransform( + UnboundedCountingInput transform) { + return CountingInput.upTo(100L); + } + + @Override + public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) { + return p.begin(); + } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + List<TaggedPValue> outputs, PCollection<Long> newOutput) { + return Collections.<PValue, ReplacementOutput>singletonMap( + newOutput, + ReplacementOutput.of( + Iterables.getOnlyElement(outputs), Iterables.getOnlyElement(newOutput.expand()))); + } + } }
