Add TransformPayloadTranslator.rehydrate to optionally specialize RawPTransform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10c63e15 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10c63e15 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10c63e15 Branch: refs/heads/master Commit: 10c63e15ab51b885372f7b6251d8ace63bae0ad1 Parents: c14455e Author: Kenneth Knowles <[email protected]> Authored: Mon Oct 2 19:25:28 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Oct 17 12:45:11 2017 -0700 ---------------------------------------------------------------------- .../core/construction/CombineTranslation.java | 33 ++-- .../CreatePCollectionViewTranslation.java | 7 +- .../core/construction/FlattenTranslator.java | 8 +- .../construction/GroupByKeyTranslation.java | 13 +- .../construction/PTransformTranslation.java | 155 ++++++++++++++++++- .../core/construction/ParDoTranslation.java | 70 ++++----- .../core/construction/PipelineTranslation.java | 76 +-------- .../core/construction/ReadTranslation.java | 43 +++-- .../construction/TestStreamTranslation.java | 8 +- .../TransformPayloadTranslatorRegistrar.java | 2 + .../construction/WindowIntoTranslation.java | 15 +- .../construction/WriteFilesTranslation.java | 16 +- .../direct/TransformEvaluatorRegistry.java | 18 +-- runners/flink/pom.xml | 5 - .../FlinkStreamingTransformTranslators.java | 60 ++----- 15 files changed, 280 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java index c3d9553..69591ee 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java @@ -54,11 +54,10 @@ import org.apache.beam.sdk.values.PCollection; public class CombineTranslation { public static final String JAVA_SERIALIZED_COMBINE_FN_URN = "urn:beam:combinefn:javasdk:v1"; - /** - * A {@link TransformPayloadTranslator} for {@link Combine.PerKey}. - */ + /** A {@link TransformPayloadTranslator} for {@link Combine.PerKey}. */ public static class CombinePayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator<Combine.PerKey<?, ?, ?>> { + extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration< + Combine.PerKey<?, ?, ?>> { public static TransformPayloadTranslator create() { return new CombinePayloadTranslator(); } @@ -81,9 +80,7 @@ public class CombineTranslation { .build(); } - /** - * Registers {@link CombinePayloadTranslator}. - */ + /** Registers {@link CombinePayloadTranslator}. */ @AutoService(TransformPayloadTranslatorRegistrar.class) public static class Registrar implements TransformPayloadTranslatorRegistrar { @Override @@ -91,6 +88,11 @@ public class CombineTranslation { getTransformPayloadTranslators() { return Collections.singletonMap(Combine.PerKey.class, new CombinePayloadTranslator()); } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } } @@ -136,8 +138,7 @@ public class CombineTranslation { .setSpec( FunctionSpec.newBuilder() .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN) - .setPayload( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn))) + .setPayload(ByteString.copyFrom(SerializableUtils.serializeToByteArray(combineFn))) .build()) .build(); } @@ -148,8 +149,8 @@ public class CombineTranslation { return components.getCoder(id); } - public static Coder<?> getAccumulatorCoder( - AppliedPTransform<?, ?, ?> transform) throws IOException { + public static Coder<?> getAccumulatorCoder(AppliedPTransform<?, ?, ?> transform) + throws IOException { SdkComponents sdkComponents = SdkComponents.create(); String id = getCombinePayload(transform, sdkComponents).getAccumulatorCoderId(); Components components = sdkComponents.toComponents(); @@ -157,17 +158,11 @@ public class CombineTranslation { components.getCodersOrThrow(id), RehydratedComponents.forComponents(components)); } - public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload) - throws IOException { + public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload) throws IOException { checkArgument(payload.getCombineFn().getSpec().getUrn().equals(JAVA_SERIALIZED_COMBINE_FN_URN)); return (GlobalCombineFn<?, ?, ?>) SerializableUtils.deserializeFromByteArray( - payload - .getCombineFn() - .getSpec() - .getPayload() - .toByteArray(), - "CombineFn"); + payload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); } public static GlobalCombineFn<?, ?, ?> getCombineFn(AppliedPTransform<?, ?, ?> transform) http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java index 4b8edcf..709cb8a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java @@ -88,7 +88,7 @@ public class CreatePCollectionViewTranslation { */ @Deprecated static class CreatePCollectionViewTranslator - implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> { + extends TransformPayloadTranslator.WithDefaultRehydration<View.CreatePCollectionView<?, ?>> { @Override public String getUrn(View.CreatePCollectionView<?, ?> transform) { return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN; @@ -122,5 +122,10 @@ public class CreatePCollectionViewTranslation { return Collections.singletonMap( View.CreatePCollectionView.class, new CreatePCollectionViewTranslator()); } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java index c9798e6..972c453 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java @@ -32,7 +32,8 @@ import org.apache.beam.sdk.transforms.windowing.Window.Assign; /** * Utility methods for translating a {@link Assign} to and from {@link RunnerApi} representations. */ -public class FlattenTranslator implements TransformPayloadTranslator<Flatten.PCollections<?>> { +public class FlattenTranslator + extends TransformPayloadTranslator.WithDefaultRehydration<Flatten.PCollections<?>> { public static TransformPayloadTranslator create() { return new FlattenTranslator(); @@ -59,5 +60,10 @@ public class FlattenTranslator implements TransformPayloadTranslator<Flatten.PCo getTransformPayloadTranslators() { return Collections.singletonMap(Flatten.PCollections.class, new FlattenTranslator()); } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java index 840bae2..0803ad3 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java @@ -34,7 +34,8 @@ import org.apache.beam.sdk.transforms.PTransform; */ public class GroupByKeyTranslation { - static class GroupByKeyTranslator implements TransformPayloadTranslator<GroupByKey<?, ?>> { + static class GroupByKeyTranslator + extends TransformPayloadTranslator.WithDefaultRehydration<GroupByKey<?, ?>> { @Override public String getUrn(GroupByKey<?, ?> transform) { return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; @@ -43,13 +44,10 @@ public class GroupByKeyTranslation { @Override public FunctionSpec translate( AppliedPTransform<?, ?, GroupByKey<?, ?>> transform, SdkComponents components) { - return FunctionSpec.newBuilder() - .setUrn(getUrn(transform.getTransform())) - .build(); + return FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); } } - /** Registers {@link GroupByKeyTranslator}. */ @AutoService(TransformPayloadTranslatorRegistrar.class) public static class Registrar implements TransformPayloadTranslatorRegistrar { @@ -58,5 +56,10 @@ public class GroupByKeyTranslation { getTransformPayloadTranslators() { return Collections.singletonMap(GroupByKey.class, new GroupByKeyTranslator()); } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 31767a0..785b9e4 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -20,7 +20,9 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; +import com.google.auto.value.AutoValue; import com.google.common.base.Joiner; +import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import java.io.IOException; @@ -74,6 +76,12 @@ public class PTransformTranslation { private static final Map<Class<? extends PTransform>, TransformPayloadTranslator> KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); + private static final Map<String, TransformPayloadTranslator> KNOWN_REHYDRATORS = + loadTransformRehydrators(); + + private static final TransformPayloadTranslator<?> DEFAULT_REHYDRATOR = + new RawPTransformTranslator(); + private static Map<Class<? extends PTransform>, TransformPayloadTranslator> loadTransformPayloadTranslators() { HashMap<Class<? extends PTransform>, TransformPayloadTranslator> translators = new HashMap<>(); @@ -98,6 +106,29 @@ public class PTransformTranslation { return ImmutableMap.copyOf(translators); } + private static Map<String, TransformPayloadTranslator> loadTransformRehydrators() { + HashMap<String, TransformPayloadTranslator> rehydrators = new HashMap<>(); + + for (TransformPayloadTranslatorRegistrar registrar : + ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) { + + Map<String, ? extends TransformPayloadTranslator> newRehydrators = + registrar.getTransformRehydrators(); + + Set<String> alreadyRegistered = + Sets.intersection(rehydrators.keySet(), newRehydrators.keySet()); + + if (!alreadyRegistered.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "URNs already registered: %s", Joiner.on(", ").join(alreadyRegistered))); + } + + rehydrators.putAll(newRehydrators); + } + return ImmutableMap.copyOf(rehydrators); + } + private PTransformTranslation() {} /** @@ -150,17 +181,36 @@ public class PTransformTranslation { // context of our current serialization transformBuilder.setSpec(((RawPTransform<?, ?>) transform).migrate(components)); } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) { - FunctionSpec payload = + transformBuilder.setSpec( KNOWN_PAYLOAD_TRANSLATORS .get(transform.getClass()) - .translate(appliedPTransform, components); - transformBuilder.setSpec(payload); + .translate(appliedPTransform, components)); } return transformBuilder.build(); } /** + * Translates a {@link RunnerApi.PTransform} to a {@link RawPTransform} specialized for the URN + * and spec. + */ + static RawPTransform<?, ?> rehydrate( + RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) + throws IOException { + + @Nullable + TransformPayloadTranslator<?> rehydrator = + KNOWN_REHYDRATORS.get( + protoTransform.getSpec() == null ? null : protoTransform.getSpec().getUrn()); + + if (rehydrator == null) { + return DEFAULT_REHYDRATOR.rehydrate(protoTransform, rehydratedComponents); + } else { + return rehydrator.rehydrate(protoTransform, rehydratedComponents); + } + } + + /** * Translates a composite {@link AppliedPTransform} into a runner API proto with no component * transforms. * @@ -206,14 +256,66 @@ public class PTransformTranslation { } /** - * A translator consumes a {@link PTransform} application and produces the appropriate - * FunctionSpec for a distinguished or primitive transform within the Beam runner API. + * A bi-directional translator between a Java-based {@link PTransform} and a protobuf payload for + * that transform. + * + * <p>When going to a protocol buffer message, the translator produces a payload corresponding to + * the Java representation while registering components that payload references. + * + * <p>When "rehydrating" a protocol buffer message, the translator returns a {@link RawPTransform} + * - because the transform may not be Java-based, it is not possible to rebuild a Java-based + * {@link PTransform}. The resulting {@link RawPTransform} subclass encapsulates the knowledge of + * which components are referenced in the payload. */ public interface TransformPayloadTranslator<T extends PTransform<?, ?>> { String getUrn(T transform); FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components) throws IOException; + + RawPTransform<?, ?> rehydrate( + RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) + throws IOException; + + /** + * A {@link TransformPayloadTranslator} for transforms that contain no references to components, + * so they do not need a specialized rehydration. + */ + abstract class WithDefaultRehydration<T extends PTransform<?, ?>> + implements TransformPayloadTranslator<T> { + @Override + public final RawPTransform<?, ?> rehydrate( + RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) + throws IOException { + return UnknownRawPTransform.forSpec(protoTransform.getSpec()); + } + } + + /** + * A {@link TransformPayloadTranslator} for transforms that contain no references to components, + * so they do not need a specialized rehydration. + */ + abstract class NotSerializable<T extends PTransform<?, ?>> + implements TransformPayloadTranslator<T> { + @Override + public final FunctionSpec translate( + AppliedPTransform<?, ?, T> transform, SdkComponents components) throws IOException { + throw new UnsupportedOperationException( + String.format( + "%s should never be translated", + transform.getTransform().getClass().getCanonicalName())); + } + + @Override + public final RawPTransform<?, ?> rehydrate( + RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) + throws IOException { + throw new UnsupportedOperationException( + String.format( + "%s.rehydrate should never be called; there is no serialized form", + getClass().getCanonicalName())); + } + } } /** @@ -264,6 +366,43 @@ public class PTransformTranslation { } } + @AutoValue + abstract static class UnknownRawPTransform extends RawPTransform<PInput, POutput> { + + @Override + public String getUrn() { + return getSpec() == null ? null : getSpec().getUrn(); + } + + @Nullable + public abstract RunnerApi.FunctionSpec getSpec(); + + public static UnknownRawPTransform forSpec(RunnerApi.FunctionSpec spec) { + return new AutoValue_PTransformTranslation_UnknownRawPTransform(spec); + } + + @Override + public POutput expand(PInput input) { + throw new IllegalStateException( + String.format( + "%s should never be asked to expand;" + + " it is the result of deserializing an already-constructed Pipeline", + getClass().getSimpleName())); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("urn", getUrn()) + .add("payload", getSpec()) + .toString(); + } + + public RunnerApi.FunctionSpec getSpecForComponents(SdkComponents components) { + return getSpec(); + } + } + /** A translator that uses the explicit URN and payload from a {@link RawPTransform}. */ public static class RawPTransformTranslator implements TransformPayloadTranslator<RawPTransform<?, ?>> { @@ -278,5 +417,11 @@ public class PTransformTranslation { throws IOException { return transform.getTransform().migrate(components); } + + @Override + public RawPTransform<?, ?> rehydrate( + RunnerApi.PTransform protoTransform, RehydratedComponents rehydratedComponents) { + return UnknownRawPTransform.forSpec(protoTransform.getSpec()); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index 3886e47..5092448 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -79,29 +79,20 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; -/** - * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos. - */ +/** Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos. */ public class ParDoTranslation { - /** - * The URN for an unknown Java {@link DoFn}. - */ + /** The URN for an unknown Java {@link DoFn}. */ public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1"; - /** - * The URN for an unknown Java {@link ViewFn}. - */ + /** The URN for an unknown Java {@link ViewFn}. */ public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1"; - /** - * The URN for an unknown Java {@link WindowMappingFn}. - */ + /** The URN for an unknown Java {@link WindowMappingFn}. */ public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN = "urn:beam:windowmappingfn:javasdk:0.1"; - /** - * A {@link TransformPayloadTranslator} for {@link ParDo}. - */ + /** A {@link TransformPayloadTranslator} for {@link ParDo}. */ public static class ParDoPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> { + extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration< + ParDo.MultiOutput<?, ?>> { public static TransformPayloadTranslator create() { return new ParDoPayloadTranslator(); } @@ -124,9 +115,7 @@ public class ParDoTranslation { .build(); } - /** - * Registers {@link ParDoPayloadTranslator}. - */ + /** Registers {@link ParDoPayloadTranslator}. */ @AutoService(TransformPayloadTranslatorRegistrar.class) public static class Registrar implements TransformPayloadTranslatorRegistrar { @Override @@ -134,11 +123,16 @@ public class ParDoTranslation { getTransformPayloadTranslators() { return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator()); } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } } public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) - throws IOException { + throws IOException { DoFn<?, ?> doFn = parDo.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); Map<String, StateDeclaration> states = signature.stateDeclarations(); @@ -158,13 +152,11 @@ public class ParDoTranslation { } } for (Map.Entry<String, StateDeclaration> state : states.entrySet()) { - RunnerApi.StateSpec spec = - toProto(getStateSpecOrCrash(state.getValue(), doFn), components); + RunnerApi.StateSpec spec = toProto(getStateSpecOrCrash(state.getValue(), doFn), components); builder.putStateSpecs(state.getKey(), spec); } for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) { - RunnerApi.TimerSpec spec = - toProto(getTimerSpecOrCrash(timer.getValue(), doFn)); + RunnerApi.TimerSpec spec = toProto(getTimerSpecOrCrash(timer.getValue(), doFn)); builder.putTimerSpecs(timer.getKey(), spec); } return builder.build(); @@ -174,7 +166,8 @@ public class ParDoTranslation { StateDeclaration stateDeclaration, DoFn<?, ?> target) { try { Object fieldValue = stateDeclaration.field().get(target); - checkState(fieldValue instanceof StateSpec, + checkState( + fieldValue instanceof StateSpec, "Malformed %s class %s: state declaration field %s does not have type %s.", DoFn.class.getSimpleName(), target.getClass().getName(), @@ -196,7 +189,8 @@ public class ParDoTranslation { TimerDeclaration timerDeclaration, DoFn<?, ?> target) { try { Object fieldValue = timerDeclaration.field().get(target); - checkState(fieldValue instanceof TimerSpec, + checkState( + fieldValue instanceof TimerSpec, "Malformed %s class %s: timer declaration field %s does not have type %s.", DoFn.class.getSimpleName(), target.getClass().getName(), @@ -273,8 +267,7 @@ public class ParDoTranslation { } SdkComponents sdkComponents = SdkComponents.create(); - RunnerApi.PTransform parDoProto = - PTransformTranslation.toProto(application, sdkComponents); + RunnerApi.PTransform parDoProto = PTransformTranslation.toProto(application, sdkComponents); ParDoPayload payload = ParDoPayload.parseFrom(parDoProto.getSpec().getPayload()); List<PCollectionView<?>> views = new ArrayList<>(); @@ -289,12 +282,7 @@ public class ParDoTranslation { "no input with tag %s", sideInputTag); views.add( - viewFromProto( - sideInput, - sideInputTag, - originalPCollection, - parDoProto, - components)); + viewFromProto(sideInput, sideInputTag, originalPCollection, parDoProto, components)); } return views; } @@ -414,7 +402,6 @@ public class ParDoTranslation { default: throw new IllegalArgumentException( String.format("Unknown %s: %s", RunnerApi.StateSpec.class.getName(), stateSpec)); - } } @@ -431,7 +418,7 @@ public class ParDoTranslation { } private static RunnerApi.TimeDomain.Enum toProto(TimeDomain timeDomain) { - switch(timeDomain) { + switch (timeDomain) { case EVENT_TIME: return RunnerApi.TimeDomain.Enum.EVENT_TIME; case PROCESSING_TIME: @@ -445,12 +432,12 @@ public class ParDoTranslation { @AutoValue abstract static class DoFnAndMainOutput implements Serializable { - public static DoFnAndMainOutput of( - DoFn<?, ?> fn, TupleTag<?> tag) { + public static DoFnAndMainOutput of(DoFn<?, ?> fn, TupleTag<?> tag) { return new AutoValue_ParDoTranslation_DoFnAndMainOutput(fn, tag); } abstract DoFn<?, ?> getDoFn(); + abstract TupleTag<?> getMainOutputTag(); } @@ -475,8 +462,7 @@ public class ParDoTranslation { FunctionSpec.class.getSimpleName(), CUSTOM_JAVA_DO_FN_URN, fnSpec.getSpec().getUrn()); - byte[] serializedFn = - fnSpec.getSpec().getPayload().toByteArray(); + byte[] serializedFn = fnSpec.getSpec().getPayload().toByteArray(); return (DoFnAndMainOutput) SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag"); } @@ -505,9 +491,7 @@ public class ParDoTranslation { public static SideInput toProto(PCollectionView<?> view) { Builder builder = SideInput.newBuilder(); builder.setAccessPattern( - FunctionSpec.newBuilder() - .setUrn(view.getViewFn().getMaterialization().getUrn()) - .build()); + FunctionSpec.newBuilder().setUrn(view.getViewFn().getMaterialization().getUrn()).build()); builder.setViewFn(toProto(view.getViewFn())); builder.setWindowMappingFn(toProto(view.getWindowMappingFn())); return builder.build(); http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java index 1624865..85033e5 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java @@ -20,8 +20,6 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.auto.value.AutoValue; -import com.google.common.base.MoreObjects; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import java.io.IOException; @@ -32,12 +30,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -47,8 +43,6 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; @@ -148,6 +142,8 @@ public class PipelineTranslation { } RunnerApi.FunctionSpec transformSpec = transformProto.getSpec(); + RawPTransform<?, ?> transform = + PTransformTranslation.rehydrate(transformProto, rehydratedComponents); // By default, no "additional" inputs, since that is an SDK-specific thing. // Only ParDo and WriteFiles really separate main from side inputs @@ -170,20 +166,6 @@ public class PipelineTranslation { transformProto, rehydratedComponents, rehydratedInputs, payload.getSideInputsMap()); } - // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674 - List<Coder<?>> additionalCoders = Collections.emptyList(); - if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) { - RunnerApi.CombinePayload payload = - RunnerApi.CombinePayload.parseFrom(transformSpec.getPayload()); - additionalCoders = - (List) - Collections.singletonList( - rehydratedComponents.getCoder(payload.getAccumulatorCoderId())); - } - - RehydratedPTransform transform = - RehydratedPTransform.of(transformSpec, additionalInputs, additionalCoders); - if (isPrimitive(transformProto)) { transforms.addFinalizedPrimitiveNode( transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs); @@ -232,58 +214,4 @@ public class PipelineTranslation { .values() .containsAll(transformProto.getOutputsMap().values()); } - - @AutoValue - abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> { - - @Nullable - public abstract RunnerApi.FunctionSpec getSpec(); - - @Override - public abstract Map<TupleTag<?>, PValue> getAdditionalInputs(); - - public abstract List<Coder<?>> getCoders(); - - @Override - public String getUrn() { - return getSpec().getUrn(); - } - - public static RehydratedPTransform of( - RunnerApi.FunctionSpec payload, - Map<TupleTag<?>, PValue> additionalInputs, - List<Coder<?>> additionalCoders) { - return new AutoValue_PipelineTranslation_RehydratedPTransform( - payload, additionalInputs, additionalCoders); - } - - @Override - public POutput expand(PInput input) { - throw new IllegalStateException( - String.format( - "%s should never be asked to expand;" - + " it is the result of deserializing an already-constructed Pipeline", - getClass().getSimpleName())); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("urn", getUrn()) - .add("payload", getSpec()) - .toString(); - } - - @Override - public RunnerApi.FunctionSpec migrate(SdkComponents components) { - for (Coder<?> coder : getCoders()) { - try { - components.registerCoder(coder); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return getSpec(); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index e9168a2..4b14c51 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -44,8 +44,8 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; /** - * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} - * {@link PTransform PTransformTranslation} into {@link ReadPayload} protos. + * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded} {@link PTransform + * PTransformTranslation} into {@link ReadPayload} protos. */ public class ReadTranslation { private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1"; @@ -89,13 +89,9 @@ public class ReadTranslation { public static BoundedSource<?> boundedSourceFromProto(ReadPayload payload) throws InvalidProtocolBufferException { checkArgument(payload.getIsBounded().equals(IsBounded.Enum.BOUNDED)); - return (BoundedSource<?>) SerializableUtils.deserializeFromByteArray( - payload - .getSource() - .getSpec() - .getPayload() - .toByteArray(), - "BoundedSource"); + return (BoundedSource<?>) + SerializableUtils.deserializeFromByteArray( + payload.getSource().getSpec().getPayload().toByteArray(), "BoundedSource"); } public static <T> BoundedSource<T> boundedSourceFromTransform( @@ -136,13 +132,9 @@ public class ReadTranslation { public static UnboundedSource<?, ?> unboundedSourceFromProto(ReadPayload payload) throws InvalidProtocolBufferException { checkArgument(payload.getIsBounded().equals(IsBounded.Enum.UNBOUNDED)); - return (UnboundedSource<?, ?>) SerializableUtils.deserializeFromByteArray( - payload - .getSource() - .getSpec() - .getPayload() - .toByteArray(), - "BoundedSource"); + return (UnboundedSource<?, ?>) + SerializableUtils.deserializeFromByteArray( + payload.getSource().getSpec().getPayload().toByteArray(), "BoundedSource"); } public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) { @@ -161,11 +153,10 @@ public class ReadTranslation { } } - /** - * A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. - */ + /** A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */ public static class UnboundedReadPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator<Read.Unbounded<?>> { + extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration< + Read.Unbounded<?>> { public static TransformPayloadTranslator create() { return new UnboundedReadPayloadTranslator(); } @@ -188,11 +179,10 @@ public class ReadTranslation { } } - /** - * A {@link TransformPayloadTranslator} for {@link Read.Bounded}. - */ + /** A {@link TransformPayloadTranslator} for {@link Read.Bounded}. */ public static class BoundedReadPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator<Read.Bounded<?>> { + extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration< + Read.Bounded<?>> { public static TransformPayloadTranslator create() { return new BoundedReadPayloadTranslator(); } @@ -226,5 +216,10 @@ public class ReadTranslation { .put(Read.Bounded.class, new BoundedReadPayloadTranslator()) .build(); } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java index de4d6bb..8e4c1db 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java @@ -172,7 +172,8 @@ public class TestStreamTranslation { } } - static class TestStreamTranslator implements TransformPayloadTranslator<TestStream<?>> { + static class TestStreamTranslator + extends TransformPayloadTranslator.WithDefaultRehydration<TestStream<?>> { @Override public String getUrn(TestStream<?> transform) { return TEST_STREAM_TRANSFORM_URN; @@ -197,5 +198,10 @@ public class TestStreamTranslation { getTransformPayloadTranslators() { return Collections.singletonMap(TestStream.class, new TestStreamTranslator()); } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java index 3b3ffa1..58417a8 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java @@ -26,4 +26,6 @@ import org.apache.beam.sdk.transforms.PTransform; public interface TransformPayloadTranslatorRegistrar { Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> getTransformPayloadTranslators(); + + Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators(); } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java index ad6177d..9158aba 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java @@ -41,7 +41,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; */ public class WindowIntoTranslation { - static class WindowAssignTranslator implements TransformPayloadTranslator<Window.Assign<?>> { + static class WindowAssignTranslator + extends TransformPayloadTranslator.WithDefaultRehydration<Window.Assign<?>> { @Override public String getUrn(Assign<?> transform) { @@ -105,11 +106,10 @@ public class WindowIntoTranslation { getWindowIntoPayload(application).getWindowFn()); } - /** - * A {@link TransformPayloadTranslator} for {@link Window}. - */ + /** A {@link TransformPayloadTranslator} for {@link Window}. */ public static class WindowIntoPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator<Window.Assign<?>> { + extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration< + Window.Assign<?>> { public static TransformPayloadTranslator create() { return new WindowIntoPayloadTranslator(); } @@ -140,5 +140,10 @@ public class WindowIntoTranslation { getTransformPayloadTranslators() { return Collections.singletonMap(Window.Assign.class, new WindowIntoPayloadTranslator()); } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java index 5a49747..645b562 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.runners.core.construction.PTransformTranslation.WRITE_FILES_TRANSFORM_URN; import com.google.auto.service.AutoService; import com.google.common.annotations.VisibleForTesting; @@ -173,10 +174,11 @@ public class WriteFilesTranslation { .getPayload()); } - static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> { + static class WriteFilesTranslator + extends TransformPayloadTranslator.WithDefaultRehydration<WriteFiles<?, ?, ?>> { @Override public String getUrn(WriteFiles<?, ?, ?> transform) { - return PTransformTranslation.WRITE_FILES_TRANSFORM_URN; + return WRITE_FILES_TRANSFORM_URN; } @Override @@ -193,9 +195,15 @@ public class WriteFilesTranslation { @AutoService(TransformPayloadTranslatorRegistrar.class) public static class Registrar implements TransformPayloadTranslatorRegistrar { @Override - public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + public Map<Class<? extends PTransform>, TransformPayloadTranslator> getTransformPayloadTranslators() { - return Collections.singletonMap(WriteFiles.class, new WriteFilesTranslator()); + return Collections.<Class<? extends PTransform>, TransformPayloadTranslator>singletonMap( + WriteFiles.class, new WriteFilesTranslator()); + } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 9cfa79f..099252f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -35,14 +35,13 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; -import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -115,6 +114,11 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { new SplittableParDoProcessElementsTranslator()) .build(); } + + @Override + public Map<String, TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } /** @@ -122,7 +126,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { * once SDF is reorganized appropriately. */ private static class SplittableParDoProcessElementsTranslator - implements TransformPayloadTranslator<ProcessElements<?, ?, ?, ?>> { + extends TransformPayloadTranslator.NotSerializable<ProcessElements<?, ?, ?, ?>> { private SplittableParDoProcessElementsTranslator() {} @@ -130,14 +134,6 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { public String getUrn(ProcessElements<?, ?, ?, ?> transform) { return SPLITTABLE_PROCESS_URN; } - - @Override - public FunctionSpec translate( - AppliedPTransform<?, ?, ProcessElements<?, ?, ?, ?>> transform, SdkComponents components) { - throw new UnsupportedOperationException( - String.format("%s should never be translated", - ProcessElements.class.getCanonicalName())); - } } // the TransformEvaluatorFactories can construct instances of all generic types of transform, http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index e77dbc8..7840c32 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -219,11 +219,6 @@ <!-- Beam --> <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-model-pipeline</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/beam/blob/10c63e15/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index d1e2d57..cec01f8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -31,12 +31,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.PTransformTranslation; -import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; @@ -56,7 +54,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -1085,7 +1082,7 @@ class FlinkStreamingTransformTranslators { * once SDF is reorganized appropriately. */ private static class SplittableParDoProcessElementsTranslator - implements PTransformTranslation.TransformPayloadTranslator< + extends PTransformTranslation.TransformPayloadTranslator.NotSerializable< SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> { private SplittableParDoProcessElementsTranslator() {} @@ -1094,17 +1091,6 @@ class FlinkStreamingTransformTranslators { public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) { return SPLITTABLE_PROCESS_URN; } - - @Override - public RunnerApi.FunctionSpec translate( - AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> - transform, - SdkComponents components) { - throw new UnsupportedOperationException( - String.format( - "%s should never be translated", - SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName())); - } } /** Registers classes specialized to the Flink runner. */ @@ -1128,6 +1114,11 @@ class FlinkStreamingTransformTranslators { new SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator()) .build(); } + + @Override + public Map<String, PTransformTranslation.TransformPayloadTranslator> getTransformRehydrators() { + return Collections.emptyMap(); + } } /** @@ -1135,7 +1126,7 @@ class FlinkStreamingTransformTranslators { * once SDF is reorganized appropriately. */ private static class SplittableParDoProcessElementsPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator< + extends PTransformTranslation.TransformPayloadTranslator.NotSerializable< SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> { private SplittableParDoProcessElementsPayloadTranslator() {} @@ -1144,17 +1135,6 @@ class FlinkStreamingTransformTranslators { public String getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?> transform) { return SplittableParDo.SPLITTABLE_PROCESS_URN; } - - @Override - public RunnerApi.FunctionSpec translate( - AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?>> - transform, - SdkComponents components) { - throw new UnsupportedOperationException( - String.format( - "%s should never be translated", - SplittableParDoViaKeyedWorkItems.ProcessElements.class.getCanonicalName())); - } } /** @@ -1162,7 +1142,7 @@ class FlinkStreamingTransformTranslators { * once SDF is reorganized appropriately. */ private static class SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator< + extends PTransformTranslation.TransformPayloadTranslator.NotSerializable< SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>> { private SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator() {} @@ -1171,24 +1151,13 @@ class FlinkStreamingTransformTranslators { public String getUrn(SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?> transform) { return SplittableParDo.SPLITTABLE_GBKIKWI_URN; } - - @Override - public RunnerApi.FunctionSpec translate( - AppliedPTransform<?, ?, SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems<?, ?>> - transform, - SdkComponents components) { - throw new UnsupportedOperationException( - String.format( - "%s should never be translated", - SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class.getCanonicalName())); - } } /** * A translator just to vend the URN. */ private static class CreateStreamingFlinkViewPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator< + extends PTransformTranslation.TransformPayloadTranslator.NotSerializable< CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>> { private CreateStreamingFlinkViewPayloadTranslator() {} @@ -1197,16 +1166,5 @@ class FlinkStreamingTransformTranslators { public String getUrn(CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?> transform) { return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN; } - - @Override - public RunnerApi.FunctionSpec translate( - AppliedPTransform<?, ?, CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>> - transform, - SdkComponents components) { - throw new UnsupportedOperationException( - String.format( - "%s should never be translated", - CreateStreamingFlinkView.CreateFlinkPCollectionView.class.getCanonicalName())); - } } }
