Remove single-matcher replacement API This makes it more difficult for runner authors to use the discouraged API that doesn't validate ordering of replacements.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/753bc9cc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/753bc9cc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/753bc9cc Branch: refs/heads/master Commit: 753bc9cc2bcfd130e1523eabf97d3de791c4cfd2 Parents: 9fb4fc3 Author: Thomas Groh <[email protected]> Authored: Tue Apr 4 10:20:49 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed Apr 5 15:26:56 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/spark/TestSparkRunner.java | 10 ++++++---- .../core/src/main/java/org/apache/beam/sdk/Pipeline.java | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/753bc9cc/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index be9ff2e..988a82b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Uninterruptibles; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -217,10 +218,11 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { @VisibleForTesting void adaptBoundedReads(Pipeline pipeline) { - pipeline.replace( - PTransformOverride.of( - PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class), - new AdaptedBoundedAsUnbounded.Factory())); + pipeline.replaceAll( + Collections.singletonList( + PTransformOverride.of( + PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class), + new AdaptedBoundedAsUnbounded.Factory()))); } private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/753bc9cc/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index fa8277f..11d781d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -228,7 +228,7 @@ public class Pipeline { }); } - public void replace(final PTransformOverride override) { + private void replace(final PTransformOverride override) { final Collection<Node> matches = new ArrayList<>(); transforms.visit( new PipelineVisitor.Defaults() {
