Add RawPTransform.migrate(SdkComponents) for re-serialization
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c14455ef Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c14455ef Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c14455ef Branch: refs/heads/master Commit: c14455ef4209a14a62f7b18f604c9673b32d06d7 Parents: 020ef14 Author: Kenneth Knowles <[email protected]> Authored: Mon Oct 2 20:28:16 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Oct 17 12:45:11 2017 -0700 ---------------------------------------------------------------------- .../construction/PTransformTranslation.java | 98 +++++++++----------- .../core/construction/PipelineTranslation.java | 36 ++++--- .../core/construction/SplittableParDo.java | 8 ++ .../core/SplittableParDoViaKeyedWorkItems.java | 7 ++ .../beam/runners/direct/DirectGroupByKey.java | 14 +++ .../beam/runners/direct/MultiStepCombine.java | 18 +++- .../direct/ParDoMultiOverrideFactory.java | 7 ++ .../direct/TestStreamEvaluatorFactory.java | 7 ++ .../runners/direct/ViewOverrideFactory.java | 8 ++ 9 files changed, 129 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index f9e7837..31767a0 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; -import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -66,8 +65,8 @@ public class PTransformTranslation { public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1"; /** - * @deprecated runners should move away from translating `CreatePCollectionView` and treat this - * as part of the translation for a `ParDo` side input. + * @deprecated runners should move away from translating `CreatePCollectionView` and treat this as + * part of the translation for a `ParDo` side input. */ @Deprecated public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1"; @@ -85,8 +84,8 @@ public class PTransformTranslation { Map<Class<? extends PTransform>, TransformPayloadTranslator> newTranslators = (Map) registrar.getTransformPayloadTranslators(); - Set<Class<? extends PTransform>> alreadyRegistered = Sets.intersection( - translators.keySet(), newTranslators.keySet()); + Set<Class<? extends PTransform>> alreadyRegistered = + Sets.intersection(translators.keySet(), newTranslators.keySet()); if (!alreadyRegistered.isEmpty()) { throw new IllegalArgumentException( @@ -143,20 +142,13 @@ public class PTransformTranslation { DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform()))); PTransform<?, ?> transform = appliedPTransform.getTransform(); + // A RawPTransform directly vends its payload. Because it will generally be // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS. if (transform instanceof RawPTransform) { - RawPTransform<?, ?> rawPTransform = (RawPTransform<?, ?>) transform; - - if (rawPTransform.getUrn() != null) { - FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn()); - @Nullable ByteString parameter = rawPTransform.getPayload(); - if (parameter != null) { - payload.setPayload(parameter); - } - transformBuilder.setSpec(payload); - } - rawPTransform.registerComponents(components); + // The raw transform was parsed in the context of other components; this puts it in the + // context of our current serialization + transformBuilder.setSpec(((RawPTransform<?, ?>) transform).migrate(components)); } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) { FunctionSpec payload = KNOWN_PAYLOAD_TRANSLATORS @@ -186,9 +178,7 @@ public class PTransformTranslation { return tag.getId(); } - /** - * Returns the URN for the transform if it is known, otherwise {@code null}. - */ + /** Returns the URN for the transform if it is known, otherwise {@code null}. */ @Nullable public static String urnForTransformOrNull(PTransform<?, ?> transform) { @@ -205,9 +195,7 @@ public class PTransformTranslation { return translator.getUrn(transform); } - /** - * Returns the URN for the transform if it is known, otherwise throws. - */ + /** Returns the URN for the transform if it is known, otherwise throws. */ public static String urnForTransform(PTransform<?, ?> transform) { String urn = urnForTransformOrNull(transform); if (urn == null) { @@ -235,24 +223,48 @@ public class PTransformTranslation { * #expand} method since the definition of the transform may be lost. The transform is already * fully expanded in the pipeline proto. */ - public abstract static class RawPTransform< - InputT extends PInput, OutputT extends POutput> + public abstract static class RawPTransform<InputT extends PInput, OutputT extends POutput> extends PTransform<InputT, OutputT> { + /** The URN for this transform, if standardized. */ @Nullable - public abstract String getUrn(); + public String getUrn() { + return getSpec() == null ? null : getSpec().getUrn(); + } + /** The payload for this transform, if any. */ @Nullable - public ByteString getPayload() { - return null; + public abstract FunctionSpec getSpec(); + + /** + * Build a new payload set in the context of the given {@link SdkComponents}, if applicable. + * + * <p>When re-serializing this transform, the ids reference in the rehydrated payload may + * conflict with those defined by the serialization context. In that case, the components must + * be re-registered and a new payload returned. + */ + public FunctionSpec migrate(SdkComponents components) throws IOException { + return getSpec(); } - public void registerComponents(SdkComponents components) {} + /** + * By default, throws an exception, but can be overridden. + * + * <p>It is permissible for runner-specific transforms to be both a {@link RawPTransform} that + * directly vends its proto representation and also to expand, for convenience of not having to + * register a translator. + */ + @Override + public OutputT expand(InputT input) { + throw new IllegalStateException( + String.format( + "%s should never be asked to expand;" + + " it is the result of deserializing an already-constructed Pipeline", + getClass().getSimpleName())); + } } - /** - * A translator that uses the explicit URN and payload from a {@link RawPTransform}. - */ + /** A translator that uses the explicit URN and payload from a {@link RawPTransform}. */ public static class RawPTransformTranslator implements TransformPayloadTranslator<RawPTransform<?, ?>> { @Override @@ -262,27 +274,9 @@ public class PTransformTranslation { @Override public FunctionSpec translate( - AppliedPTransform<?, ?, RawPTransform<?, ?>> transform, - SdkComponents components) { - - // Anonymous composites have no spec - if (transform.getTransform().getUrn() == null) { - return null; - } - - FunctionSpec.Builder transformSpec = - FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())); - - ByteString payload = transform.getTransform().getPayload(); - if (payload != null) { - transformSpec.setPayload(payload); - } - - // Transforms like Combine may have Coders that need to be added but do not - // occur in a black-box traversal - transform.getTransform().registerComponents(components); - - return transformSpec.build(); + AppliedPTransform<?, ?, RawPTransform<?, ?>> transform, SdkComponents components) + throws IOException { + return transform.getTransform().migrate(components); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java index 0aca837..1624865 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java @@ -24,7 +24,6 @@ import com.google.auto.value.AutoValue; import com.google.common.base.MoreObjects; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; -import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -105,8 +104,7 @@ public class PipelineTranslation { return DisplayData.from(component); } - public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto) - throws IOException { + public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto) throws IOException { TransformHierarchy transforms = new TransformHierarchy(); Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, PipelineOptionsFactory.create()); @@ -184,11 +182,7 @@ public class PipelineTranslation { } RehydratedPTransform transform = - RehydratedPTransform.of( - transformSpec.getUrn(), - transformSpec.getPayload(), - additionalInputs, - additionalCoders); + RehydratedPTransform.of(transformSpec, additionalInputs, additionalCoders); if (isPrimitive(transformProto)) { transforms.addFinalizedPrimitiveNode( @@ -234,32 +228,33 @@ public class PipelineTranslation { private static boolean isPrimitive(RunnerApi.PTransform transformProto) { return transformProto.getSubtransformsCount() == 0 && !transformProto - .getInputsMap() - .values() - .containsAll(transformProto.getOutputsMap().values()); + .getInputsMap() + .values() + .containsAll(transformProto.getOutputsMap().values()); } @AutoValue abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> { @Nullable - public abstract String getUrn(); - - @Nullable - public abstract ByteString getPayload(); + public abstract RunnerApi.FunctionSpec getSpec(); @Override public abstract Map<TupleTag<?>, PValue> getAdditionalInputs(); public abstract List<Coder<?>> getCoders(); + @Override + public String getUrn() { + return getSpec().getUrn(); + } + public static RehydratedPTransform of( - String urn, - ByteString payload, + RunnerApi.FunctionSpec payload, Map<TupleTag<?>, PValue> additionalInputs, List<Coder<?>> additionalCoders) { return new AutoValue_PipelineTranslation_RehydratedPTransform( - urn, payload, additionalInputs, additionalCoders); + payload, additionalInputs, additionalCoders); } @Override @@ -275,12 +270,12 @@ public class PipelineTranslation { public String toString() { return MoreObjects.toStringHelper(this) .add("urn", getUrn()) - .add("payload", getPayload()) + .add("payload", getSpec()) .toString(); } @Override - public void registerComponents(SdkComponents components) { + public RunnerApi.FunctionSpec migrate(SdkComponents components) { for (Coder<?> coder : getCoders()) { try { components.registerCoder(coder); @@ -288,6 +283,7 @@ public class PipelineTranslation { throw new RuntimeException(e); } } + return getSpec(); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index 32d3409..ab66e84 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; @@ -295,6 +297,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> public String getUrn() { return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN; } + + @Nullable + @Override + public RunnerApi.FunctionSpec getSpec() { + return null; + } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 251260e..400df19 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import java.util.List; import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.runners.core.construction.ReplacementOutputs; @@ -87,6 +88,12 @@ public class SplittableParDoViaKeyedWorkItems { public String getUrn() { return SplittableParDo.SPLITTABLE_GBKIKWI_URN; } + + @Override + public RunnerApi.FunctionSpec getSpec() { + throw new UnsupportedOperationException( + String.format("%s should never be serialized to proto", getClass().getSimpleName())); + } } /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index 3ba04e7..9e56b65 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkArgument; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.construction.ForwardingPTransform; @@ -92,6 +94,12 @@ class DirectGroupByKey<K, V> public String getUrn() { return DIRECT_GBKO_URN; } + + @Nullable + @Override + public RunnerApi.FunctionSpec getSpec() { + return null; + } } static final class DirectGroupAlsoByWindow<K, V> @@ -141,5 +149,11 @@ class DirectGroupByKey<K, V> public String getUrn() { return DIRECT_GABW_URN; } + + @Nullable + @Override + public RunnerApi.FunctionSpec getSpec() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java index ae21b4d..5253ef5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java @@ -28,7 +28,9 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.CombineTranslation; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; @@ -177,12 +179,18 @@ class MultiStepCombine<K, InputT, AccumT, OutputT> this.outputCoder = outputCoder; } - @Nullable + @Nonnull @Override public String getUrn() { return "urn:beam:directrunner:transforms:multistepcombine:v1"; } + @Nullable + @Override + public RunnerApi.FunctionSpec getSpec() { + return null; + } + @Override public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) { checkArgument( @@ -337,11 +345,17 @@ class MultiStepCombine<K, InputT, AccumT, OutputT> input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), outputCoder); } - @Nullable + @Nonnull @Override public String getUrn() { return DIRECT_MERGE_ACCUMULATORS_EXTRACT_OUTPUT_URN; } + + @Nullable + @Override + public RunnerApi.FunctionSpec getSpec() { + return null; + } } static class MergeAndExtractAccumulatorOutputEvaluatorFactory http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 26f30b0..5ec52be 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.KeyedWorkItems; @@ -261,6 +262,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT> public String getUrn() { return DIRECT_STATEFUL_PAR_DO_URN; } + + @Override + public RunnerApi.FunctionSpec getSpec() { + throw new UnsupportedOperationException( + String.format("%s should never be serialized to proto", getClass().getSimpleName())); + } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 49e7be7..d62b64c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.TestStreamTranslation; @@ -218,6 +219,12 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { public String getUrn() { return DIRECT_TEST_STREAM_URN; } + + @Nullable + @Override + public RunnerApi.FunctionSpec getSpec() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c14455ef/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java index c2255fe..61b7978 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.direct; import java.io.IOException; import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; @@ -128,6 +130,12 @@ class ViewOverrideFactory<ElemT, ViewT> public String getUrn() { return DIRECT_WRITE_VIEW_URN; } + + @Nullable + @Override + public RunnerApi.FunctionSpec getSpec() { + return null; + } } public static final String DIRECT_WRITE_VIEW_URN =
