Repository: beam Updated Branches: refs/heads/master b53e6f0dc -> 4aef93854
DirectRunner override matchers using Runner API Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8d90878 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8d90878 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8d90878 Branch: refs/heads/master Commit: d8d9087877c01f1786271726a541fb3eeda7f939 Parents: ca7b9c2 Author: Kenneth Knowles <[email protected]> Authored: Thu May 25 06:31:16 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 8 11:36:28 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d8d90878/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index dbd1ec4..136ccf3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; @@ -42,12 +43,9 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PTransformOverride; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; -import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; @@ -230,33 +228,33 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { new WriteWithShardingFactory())) /* Uses a view internally. */ .add( PTransformOverride.of( - PTransformMatchers.classEqualTo(CreatePCollectionView.class), + PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN), new ViewOverrideFactory())) /* Uses pardos and GBKs */ .add( PTransformOverride.of( - PTransformMatchers.classEqualTo(TestStream.class), + PTransformMatchers.urnEqualTo(PTransformTranslation.TEST_STREAM_TRANSFORM_URN), new DirectTestStreamFactory(this))) /* primitive */ // SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and extra // primitives .add( PTransformOverride.of( - PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())) + PTransformMatchers.splittableParDo(), new ParDoMultiOverrideFactory())) // state and timer pardos are implemented in terms of simple ParDos and extra primitives .add( PTransformOverride.of( - PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())) + PTransformMatchers.stateOrTimerParDo(), new ParDoMultiOverrideFactory())) .add( PTransformOverride.of( - PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class), + PTransformMatchers.urnEqualTo( + SplittableParDo.SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN), new SplittableParDoViaKeyedWorkItems.OverrideFactory())) .add( PTransformOverride.of( - PTransformMatchers.classEqualTo( - SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class), + PTransformMatchers.urnEqualTo(SplittableParDo.SPLITTABLE_GBKIKWI_URN), new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */ .add( PTransformOverride.of( - PTransformMatchers.classEqualTo(GroupByKey.class), + PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN), new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */ .build(); }
