Fix RawPTransform translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/840492d9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/840492d9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/840492d9 Branch: refs/heads/DSL_SQL Commit: 840492d9d8fb3b08cfe70a525655759fc1a31fdf Parents: 7c608c3 Author: Kenneth Knowles <[email protected]> Authored: Fri May 26 14:18:03 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Jun 6 13:10:33 2017 -0700 ---------------------------------------------------------------------- .../construction/PTransformTranslation.java | 57 ++++++++++++++++---- runners/direct-java/pom.xml | 5 -- .../beam/runners/direct/DirectGroupByKey.java | 5 +- .../direct/ParDoMultiOverrideFactory.java | 3 +- .../direct/TestStreamEvaluatorFactory.java | 3 +- .../direct/TransformEvaluatorRegistry.java | 8 +-- .../runners/direct/ViewOverrideFactory.java | 3 +- 7 files changed, 56 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 7c5c593..32ecf43 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; -import com.google.protobuf.Message; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -115,7 +114,20 @@ public class PTransformTranslation { // TODO: Display Data PTransform<?, ?> transform = appliedPTransform.getTransform(); - if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) { + // A RawPTransform directly vends its payload. Because it will generally be + // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS. + if (transform instanceof RawPTransform) { + RawPTransform<?, ?> rawPTransform = (RawPTransform<?, ?>) transform; + + if (rawPTransform.getUrn() != null) { + FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn()); + @Nullable Any parameter = rawPTransform.getPayload(); + if (parameter != null) { + payload.setParameter(parameter); + } + transformBuilder.setSpec(payload); + } + } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) { FunctionSpec payload = KNOWN_PAYLOAD_TRANSLATORS .get(transform.getClass()) @@ -145,6 +157,25 @@ public class PTransformTranslation { } /** + * Returns the URN for the transform if it is known, otherwise {@code null}. + */ + @Nullable + public static String urnForTransformOrNull(PTransform<?, ?> transform) { + + // A RawPTransform directly vends its URN. Because it will generally be + // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS. + if (transform instanceof RawPTransform) { + return ((RawPTransform) transform).getUrn(); + } + + TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()); + if (translator == null) { + return null; + } + return translator.getUrn(transform); + } + + /** * Returns the URN for the transform if it is known, otherwise throws. */ public static String urnForTransform(PTransform<?, ?> transform) { @@ -176,13 +207,14 @@ public class PTransformTranslation { * fully expanded in the pipeline proto. */ public abstract static class RawPTransform< - InputT extends PInput, OutputT extends POutput, PayloadT extends Message> + InputT extends PInput, OutputT extends POutput> extends PTransform<InputT, OutputT> { + @Nullable public abstract String getUrn(); @Nullable - PayloadT getPayload() { + public Any getPayload() { return null; } } @@ -190,24 +222,29 @@ public class PTransformTranslation { /** * A translator that uses the explicit URN and payload from a {@link RawPTransform}. */ - public static class RawPTransformTranslator<PayloadT extends Message> - implements TransformPayloadTranslator<RawPTransform<?, ?, PayloadT>> { + public static class RawPTransformTranslator + implements TransformPayloadTranslator<RawPTransform<?, ?>> { @Override - public String getUrn(RawPTransform<?, ?, PayloadT> transform) { + public String getUrn(RawPTransform<?, ?> transform) { return transform.getUrn(); } @Override public FunctionSpec translate( - AppliedPTransform<?, ?, RawPTransform<?, ?, PayloadT>> transform, + AppliedPTransform<?, ?, RawPTransform<?, ?>> transform, SdkComponents components) { - PayloadT payload = transform.getTransform().getPayload(); + + // Anonymous composites have no spec + if (transform.getTransform().getUrn() == null) { + return null; + } FunctionSpec.Builder transformSpec = FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())); + Any payload = transform.getTransform().getPayload(); if (payload != null) { - transformSpec.setParameter(Any.pack(payload)); + transformSpec.setParameter(payload); } return transformSpec.build(); http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index cba4b09..bec2113 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -208,11 +208,6 @@ </dependency> <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </dependency> - - <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index f239070..2fc0dd4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkArgument; -import com.google.protobuf.Message; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.construction.ForwardingPTransform; @@ -74,7 +73,7 @@ class DirectGroupByKey<K, V> static final class DirectGroupByKeyOnly<K, V> extends PTransformTranslation.RawPTransform< - PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, Message> { + PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> { @Override public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) { return PCollection.createPrimitiveOutputInternal( @@ -101,7 +100,7 @@ class DirectGroupByKey<K, V> static final class DirectGroupAlsoByWindow<K, V> extends PTransformTranslation.RawPTransform< - PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, Message> { + PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> { private final WindowingStrategy<?, ?> inputWindowingStrategy; private final WindowingStrategy<?, ?> outputWindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index df2054b..858ea34 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; -import com.google.protobuf.Message; import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; @@ -172,7 +171,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT> static class StatefulParDo<K, InputT, OutputT> extends PTransformTranslation.RawPTransform< - PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, Message> { + PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> { private final transient MultiOutput<KV<K, InputT>, OutputT> underlyingParDo; private final transient PCollection<KV<K, InputT>> originalInput; http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index b1db58f..2da7a71 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -22,7 +22,6 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.collect.Iterables; -import com.google.protobuf.Message; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -185,7 +184,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1"; static class DirectTestStream<T> - extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>, Message> { + extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>> { private final transient DirectRunner runner; private final TestStream<T> original; http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index d144b20..0c907df 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -92,17 +92,17 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { .<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder() .put( DirectGroupByKey.DirectGroupByKeyOnly.class, - new PTransformTranslation.RawPTransformTranslator<>()) + new PTransformTranslation.RawPTransformTranslator()) .put( DirectGroupByKey.DirectGroupAlsoByWindow.class, new PTransformTranslation.RawPTransformTranslator()) .put( ParDoMultiOverrideFactory.StatefulParDo.class, - new PTransformTranslation.RawPTransformTranslator<>()) + new PTransformTranslation.RawPTransformTranslator()) .put( ViewOverrideFactory.WriteView.class, - new PTransformTranslation.RawPTransformTranslator<>()) - .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator<>()) + new PTransformTranslation.RawPTransformTranslator()) + .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator()) .put( SplittableParDoViaKeyedWorkItems.ProcessElements.class, new SplittableParDoProcessElementsTranslator()) http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java index 501b436..fdff63d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.direct; -import com.google.protobuf.Message; import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.construction.ForwardingPTransform; @@ -95,7 +94,7 @@ class ViewOverrideFactory<ElemT, ViewT> * to {@link ViewT}. */ static final class WriteView<ElemT, ViewT> - extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>, Message> { + extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> { private final CreatePCollectionView<ElemT, ViewT> og; WriteView(CreatePCollectionView<ElemT, ViewT> og) {
