Repository: beam Updated Branches: refs/heads/master d6f6351f1 -> 75b6567f6
Port the DirectRunner to the Batch Surgery API Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdc2eddb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdc2eddb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdc2eddb Branch: refs/heads/master Commit: fdc2eddb633ed0e0dde80948d4588757e7a552e6 Parents: 85af898 Author: Thomas Groh <[email protected]> Authored: Fri Mar 17 16:39:58 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Wed Mar 22 18:11:54 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 60 +++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fdc2eddb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 94f0521..62df6c8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; @@ -44,9 +45,7 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.runners.PTransformOverride; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Aggregator; @@ -260,10 +259,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { @Override public DirectPipelineResult run(Pipeline pipeline) { - for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : - defaultTransformOverrides().entrySet()) { - pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue())); - } + pipeline.replaceAll(defaultTransformOverrides()); MetricsEnvironment.setMetricsSupported(true); DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); pipeline.traverseTopologically(graphVisitor); @@ -321,27 +317,37 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { * iteration order based on the order at which elements are added to it. */ @SuppressWarnings("rawtypes") - private Map<PTransformMatcher, PTransformOverrideFactory> defaultTransformOverrides() { - return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder() - .put( - PTransformMatchers.writeWithRunnerDeterminedSharding(), - new WriteWithShardingFactory()) /* Uses a view internally. */ - .put( - PTransformMatchers.classEqualTo(CreatePCollectionView.class), - new ViewOverrideFactory()) /* Uses pardos and GBKs */ - .put( - PTransformMatchers.classEqualTo(TestStream.class), - new DirectTestStreamFactory(this)) /* primitive */ - // SplittableParDo is implemented in terms of GroupByKeys and Primitives - .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()) - // state and timer ParDos are implemented in terms of GroupByKeys and Primitives - .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()) - .put( - PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class), - new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */ - .put( - PTransformMatchers.classEqualTo(GroupByKey.class), - new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */ + private List<PTransformOverride> defaultTransformOverrides() { + return ImmutableList.<PTransformOverride>builder() + .add( + PTransformOverride.of( + PTransformMatchers.writeWithRunnerDeterminedSharding(), + new WriteWithShardingFactory())) /* Uses a view internally. */ + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(CreatePCollectionView.class), + new ViewOverrideFactory())) /* Uses pardos and GBKs */ + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(TestStream.class), + new DirectTestStreamFactory(this))) /* primitive */ + // SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and extra + // primitives + .add( + PTransformOverride.of( + PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())) + // state and timer pardos are implemented in terms of simple ParDos and extra primitives + .add( + PTransformOverride.of( + PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class), + new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */ + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(GroupByKey.class), + new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */ .build(); }
