Repository: beam
Updated Branches:
  refs/heads/master b53e6f0dc -> 4aef93854


DirectRunner override matchers using Runner API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8d90878
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8d90878
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8d90878

Branch: refs/heads/master
Commit: d8d9087877c01f1786271726a541fb3eeda7f939
Parents: ca7b9c2
Author: Kenneth Knowles <[email protected]>
Authored: Thu May 25 06:31:16 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 8 11:36:28 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8d90878/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index dbd1ec4..136ccf3 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import 
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -42,12 +43,9 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -230,33 +228,33 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
                 new WriteWithShardingFactory())) /* Uses a view internally. */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+                
PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
                 new ViewOverrideFactory())) /* Uses pardos and GBKs */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(TestStream.class),
+                
PTransformMatchers.urnEqualTo(PTransformTranslation.TEST_STREAM_TRANSFORM_URN),
                 new DirectTestStreamFactory(this))) /* primitive */
         // SplittableParMultiDo is implemented in terms of nonsplittable 
simple ParDos and extra
         // primitives
         .add(
             PTransformOverride.of(
-                PTransformMatchers.splittableParDoMulti(), new 
ParDoMultiOverrideFactory()))
+                PTransformMatchers.splittableParDo(), new 
ParDoMultiOverrideFactory()))
         // state and timer pardos are implemented in terms of simple ParDos 
and extra primitives
         .add(
             PTransformOverride.of(
-                PTransformMatchers.stateOrTimerParDoMulti(), new 
ParDoMultiOverrideFactory()))
+                PTransformMatchers.stateOrTimerParDo(), new 
ParDoMultiOverrideFactory()))
         .add(
             PTransformOverride.of(
-                
PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+                PTransformMatchers.urnEqualTo(
+                    SplittableParDo.SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN),
                 new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(
-                    
SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class),
+                
PTransformMatchers.urnEqualTo(SplittableParDo.SPLITTABLE_GBKIKWI_URN),
                 new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns 
a GBKO */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(GroupByKey.class),
+                
PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN),
                 new DirectGroupByKeyOverrideFactory())) /* returns two chained 
primitives. */
         .build();
   }

Reply via email to