DirectRunner: Replace use of RawPTransform with NotSerializable.forUrn 
translators


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/505021e6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/505021e6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/505021e6

Branch: refs/heads/master
Commit: 505021e6a253b882bb870694ff7540418e809e51
Parents: 01103c2
Author: Kenneth Knowles <[email protected]>
Authored: Tue Oct 17 12:43:19 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Tue Oct 17 13:48:28 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectGroupByKey.java   | 30 ++------------------
 .../direct/ParDoMultiOverrideFactory.java       | 16 +----------
 .../direct/TestStreamEvaluatorFactory.java      | 16 +----------
 .../direct/TransformEvaluatorRegistry.java      | 14 +++++----
 .../runners/direct/ViewOverrideFactory.java     | 16 +----------
 5 files changed, 13 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 9e56b65..0053360 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,12 +20,9 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import javax.annotation.Nullable;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -74,8 +71,7 @@ class DirectGroupByKey<K, V>
   }
 
   static final class DirectGroupByKeyOnly<K, V>
-      extends PTransformTranslation.RawPTransform<
-          PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, 
V>>> {
     @Override
     public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> 
input) {
       return PCollection.createPrimitiveOutputInternal(
@@ -89,21 +85,10 @@ class DirectGroupByKey<K, V>
     }
 
     DirectGroupByKeyOnly() {}
-
-    @Override
-    public String getUrn() {
-      return DIRECT_GBKO_URN;
-    }
-
-    @Nullable
-    @Override
-    public RunnerApi.FunctionSpec getSpec() {
-      return null;
-    }
   }
 
   static final class DirectGroupAlsoByWindow<K, V>
-      extends PTransformTranslation.RawPTransform<
+      extends PTransform<
           PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
 
     private final WindowingStrategy<?, ?> inputWindowingStrategy;
@@ -144,16 +129,5 @@ class DirectGroupByKey<K, V>
           input.getPipeline(), outputWindowingStrategy, input.isBounded(),
           KvCoder.of(inputCoder.getKeyCoder(), 
IterableCoder.of(inputCoder.getElementCoder())));
     }
-
-    @Override
-    public String getUrn() {
-      return DIRECT_GABW_URN;
-    }
-
-    @Nullable
-    @Override
-    public RunnerApi.FunctionSpec getSpec() {
-      return null;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 5ec52be..e8a9c83 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -23,12 +23,10 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SplittableParDo;
@@ -204,8 +202,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
       "urn:beam:directrunner:transforms:stateful_pardo:v1";
 
   static class StatefulParDo<K, InputT, OutputT>
-      extends PTransformTranslation.RawPTransform<
-          PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, 
PCollectionTuple> {
+      extends PTransform<PCollection<? extends KeyedWorkItem<K, KV<K, 
InputT>>>, PCollectionTuple> {
     private final transient DoFn<KV<K, InputT>, OutputT> doFn;
     private final TupleTagList additionalOutputTags;
     private final TupleTag<OutputT> mainOutputTag;
@@ -257,17 +254,6 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
 
       return outputs;
     }
-
-    @Override
-    public String getUrn() {
-      return DIRECT_STATEFUL_PAR_DO_URN;
-    }
-
-    @Override
-    public RunnerApi.FunctionSpec getSpec() {
-      throw new UnsupportedOperationException(
-          String.format("%s should never be serialized to proto", 
getClass().getSimpleName()));
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index d62b64c..e42b5fe 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -29,8 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.TestStreamTranslation;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -194,8 +192,7 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
 
     static final String DIRECT_TEST_STREAM_URN = 
"urn:beam:directrunner:transforms:test_stream:v1";
 
-    static class DirectTestStream<T>
-        extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>> {
+    static class DirectTestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
       private final transient DirectRunner runner;
       private final TestStream<T> original;
 
@@ -214,17 +211,6 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
             IsBounded.UNBOUNDED,
             original.getValueCoder());
       }
-
-      @Override
-      public String getUrn() {
-        return DIRECT_TEST_STREAM_URN;
-      }
-
-      @Nullable
-      @Override
-      public RunnerApi.FunctionSpec getSpec() {
-        return null;
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 099252f..708a931 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -98,20 +98,22 @@ class TransformEvaluatorRegistry implements 
TransformEvaluatorFactory {
           .<Class<? extends PTransform>, 
PTransformTranslation.TransformPayloadTranslator>builder()
           .put(
               DirectGroupByKey.DirectGroupByKeyOnly.class,
-              new PTransformTranslation.RawPTransformTranslator())
+              
TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_GBKO_URN))
           .put(
               DirectGroupByKey.DirectGroupAlsoByWindow.class,
-              new PTransformTranslation.RawPTransformTranslator())
+              
TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_GABW_URN))
           .put(
               ParDoMultiOverrideFactory.StatefulParDo.class,
-              new PTransformTranslation.RawPTransformTranslator())
+              
TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_STATEFUL_PAR_DO_URN))
           .put(
               ViewOverrideFactory.WriteView.class,
-              new PTransformTranslation.RawPTransformTranslator())
-          .put(DirectTestStream.class, new 
PTransformTranslation.RawPTransformTranslator())
+              
TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_WRITE_VIEW_URN))
+          .put(
+              DirectTestStream.class,
+              
TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_TEST_STREAM_URN))
           .put(
               SplittableParDoViaKeyedWorkItems.ProcessElements.class,
-              new SplittableParDoProcessElementsTranslator())
+              
TransformPayloadTranslator.NotSerializable.forUrn(SPLITTABLE_PROCESS_URN))
           .build();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 61b7978..0079f98 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -20,11 +20,8 @@ package org.apache.beam.runners.direct;
 
 import java.io.IOException;
 import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
 import 
org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
-import 
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -107,7 +104,7 @@ class ViewOverrideFactory<ElemT, ViewT>
    * to {@link ViewT}.
    */
   static final class WriteView<ElemT, ViewT>
-      extends RawPTransform<PCollection<Iterable<ElemT>>, 
PCollection<Iterable<ElemT>>> {
+      extends PTransform<PCollection<Iterable<ElemT>>, 
PCollection<Iterable<ElemT>>> {
     private final PCollectionView<ViewT> view;
 
     WriteView(PCollectionView<ViewT> view) {
@@ -125,17 +122,6 @@ class ViewOverrideFactory<ElemT, ViewT>
     public PCollectionView<ViewT> getView() {
       return view;
     }
-
-    @Override
-    public String getUrn() {
-      return DIRECT_WRITE_VIEW_URN;
-    }
-
-    @Nullable
-    @Override
-    public RunnerApi.FunctionSpec getSpec() {
-      return null;
-    }
   }
 
   public static final String DIRECT_WRITE_VIEW_URN =

Reply via email to