DirectRunner: Replace use of RawPTransform with NotSerializable.forUrn translators
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/505021e6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/505021e6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/505021e6 Branch: refs/heads/master Commit: 505021e6a253b882bb870694ff7540418e809e51 Parents: 01103c2 Author: Kenneth Knowles <[email protected]> Authored: Tue Oct 17 12:43:19 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Oct 17 13:48:28 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectGroupByKey.java | 30 ++------------------ .../direct/ParDoMultiOverrideFactory.java | 16 +---------- .../direct/TestStreamEvaluatorFactory.java | 16 +---------- .../direct/TransformEvaluatorRegistry.java | 14 +++++---- .../runners/direct/ViewOverrideFactory.java | 16 +---------- 5 files changed, 13 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index 9e56b65..0053360 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -20,12 +20,9 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkArgument; -import javax.annotation.Nullable; -import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.construction.ForwardingPTransform; -import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -74,8 +71,7 @@ class DirectGroupByKey<K, V> } static final class DirectGroupByKeyOnly<K, V> - extends PTransformTranslation.RawPTransform< - PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> { + extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> { @Override public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) { return PCollection.createPrimitiveOutputInternal( @@ -89,21 +85,10 @@ class DirectGroupByKey<K, V> } DirectGroupByKeyOnly() {} - - @Override - public String getUrn() { - return DIRECT_GBKO_URN; - } - - @Nullable - @Override - public RunnerApi.FunctionSpec getSpec() { - return null; - } } static final class DirectGroupAlsoByWindow<K, V> - extends PTransformTranslation.RawPTransform< + extends PTransform< PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> { private final WindowingStrategy<?, ?> inputWindowingStrategy; @@ -144,16 +129,5 @@ class DirectGroupByKey<K, V> input.getPipeline(), outputWindowingStrategy, input.isBounded(), KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder()))); } - - @Override - public String getUrn() { - return DIRECT_GABW_URN; - } - - @Nullable - @Override - public RunnerApi.FunctionSpec getSpec() { - return null; - } } } http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 5ec52be..e8a9c83 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -23,12 +23,10 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.construction.PTransformReplacements; -import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SplittableParDo; @@ -204,8 +202,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT> "urn:beam:directrunner:transforms:stateful_pardo:v1"; static class StatefulParDo<K, InputT, OutputT> - extends PTransformTranslation.RawPTransform< - PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> { + extends PTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> { private final transient DoFn<KV<K, InputT>, OutputT> doFn; private final TupleTagList additionalOutputTags; private final TupleTag<OutputT> mainOutputTag; @@ -257,17 +254,6 @@ class ParDoMultiOverrideFactory<InputT, OutputT> return outputs; } - - @Override - public String getUrn() { - return DIRECT_STATEFUL_PAR_DO_URN; - } - - @Override - public RunnerApi.FunctionSpec getSpec() { - throw new UnsupportedOperationException( - String.format("%s should never be serialized to proto", getClass().getSimpleName())); - } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index d62b64c..e42b5fe 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -29,8 +29,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.TestStreamTranslation; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -194,8 +192,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1"; - static class DirectTestStream<T> - extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>> { + static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> { private final transient DirectRunner runner; private final TestStream<T> original; @@ -214,17 +211,6 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { IsBounded.UNBOUNDED, original.getValueCoder()); } - - @Override - public String getUrn() { - return DIRECT_TEST_STREAM_URN; - } - - @Nullable - @Override - public RunnerApi.FunctionSpec getSpec() { - return null; - } } } http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 099252f..708a931 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -98,20 +98,22 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { .<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder() .put( DirectGroupByKey.DirectGroupByKeyOnly.class, - new PTransformTranslation.RawPTransformTranslator()) + TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_GBKO_URN)) .put( DirectGroupByKey.DirectGroupAlsoByWindow.class, - new PTransformTranslation.RawPTransformTranslator()) + TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_GABW_URN)) .put( ParDoMultiOverrideFactory.StatefulParDo.class, - new PTransformTranslation.RawPTransformTranslator()) + TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_STATEFUL_PAR_DO_URN)) .put( ViewOverrideFactory.WriteView.class, - new PTransformTranslation.RawPTransformTranslator()) - .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator()) + TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_WRITE_VIEW_URN)) + .put( + DirectTestStream.class, + TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_TEST_STREAM_URN)) .put( SplittableParDoViaKeyedWorkItems.ProcessElements.class, - new SplittableParDoProcessElementsTranslator()) + TransformPayloadTranslator.NotSerializable.forUrn(SPLITTABLE_PROCESS_URN)) .build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/505021e6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java index 61b7978..0079f98 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java @@ -20,11 +20,8 @@ package org.apache.beam.runners.direct; import java.io.IOException; import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; import org.apache.beam.runners.core.construction.PTransformReplacements; -import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; @@ -107,7 +104,7 @@ class ViewOverrideFactory<ElemT, ViewT> * to {@link ViewT}. */ static final class WriteView<ElemT, ViewT> - extends RawPTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> { + extends PTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> { private final PCollectionView<ViewT> view; WriteView(PCollectionView<ViewT> view) { @@ -125,17 +122,6 @@ class ViewOverrideFactory<ElemT, ViewT> public PCollectionView<ViewT> getView() { return view; } - - @Override - public String getUrn() { - return DIRECT_WRITE_VIEW_URN; - } - - @Nullable - @Override - public RunnerApi.FunctionSpec getSpec() { - return null; - } } public static final String DIRECT_WRITE_VIEW_URN =
