Include PCollection in rehydrated PCollectionView
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bdece9d2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bdece9d2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bdece9d2 Branch: refs/heads/DSL_SQL Commit: bdece9d2a57824865a35b4367619569e5800ed1b Parents: 860e0a0 Author: Kenneth Knowles <[email protected]> Authored: Thu Jul 6 09:24:55 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 20:01:01 2017 -0700 ---------------------------------------------------------------------- .../core/construction/ParDoTranslation.java | 51 +++++++++++++++++--- .../construction/RunnerPCollectionView.java | 7 +-- .../core/construction/ParDoTranslationTest.java | 28 +++++++---- 3 files changed, 67 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bdece9d2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index 5f2bcae..fe8c5aa 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; @@ -74,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; @@ -262,8 +264,12 @@ public class ParDoTranslation { List<PCollectionView<?>> views = new ArrayList<>(); for (Map.Entry<String, SideInput> sideInput : payload.getSideInputsMap().entrySet()) { views.add( - fromProto( - sideInput.getValue(), sideInput.getKey(), parDoProto, sdkComponents.toComponents())); + viewFromProto( + application.getPipeline(), + sideInput.getValue(), + sideInput.getKey(), + parDoProto, + sdkComponents.toComponents())); } return views; } @@ -495,15 +501,47 @@ public class ParDoTranslation { return builder.build(); } - public static PCollectionView<?> fromProto( - SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components) + public static PCollectionView<?> viewFromProto( + Pipeline pipeline, + SideInput sideInput, + String localName, + RunnerApi.PTransform parDoTransform, + Components components) throws IOException { - TupleTag<?> tag = new TupleTag<>(id); + + String pCollectionId = parDoTransform.getInputsOrThrow(localName); + + // This may be a PCollection defined in another language, but we should be + // able to rehydrate it enough to stick it in a side input. The coder may not + // be grokkable in Java. + PCollection<?> pCollection = + PCollectionTranslation.fromProto( + pipeline, components.getPcollectionsOrThrow(pCollectionId), components); + + return viewFromProto(sideInput, localName, pCollection, parDoTransform, components); + } + + /** + * Create a {@link PCollectionView} from a side input spec and an already-deserialized {@link + * PCollection} that should be wired up. + */ + public static PCollectionView<?> viewFromProto( + SideInput sideInput, + String localName, + PCollection<?> pCollection, + RunnerApi.PTransform parDoTransform, + Components components) + throws IOException { + checkArgument( + localName != null, + "%s.viewFromProto: localName must not be null", + ParDoTranslation.class.getSimpleName()); + TupleTag<?> tag = new TupleTag<>(localName); WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn()); ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn()); RunnerApi.PCollection inputCollection = - components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id)); + components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(localName)); WindowingStrategy<?, ?> windowingStrategy = WindowingStrategyTranslation.fromProto( components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()), @@ -523,6 +561,7 @@ public class ParDoTranslation { PCollectionView<?> view = new RunnerPCollectionView<>( + pCollection, (TupleTag<Iterable<WindowedValue<?>>>) tag, (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn, windowMappingFn, http://git-wip-us.apache.org/repos/asf/beam/blob/bdece9d2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java index c359cec..b275188 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java @@ -39,16 +39,19 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T> private final WindowMappingFn<?> windowMappingFn; private final WindowingStrategy<?, ?> windowingStrategy; private final Coder<Iterable<WindowedValue<?>>> coder; + private final transient PCollection<?> pCollection; /** * Create a new {@link RunnerPCollectionView} from the provided components. */ RunnerPCollectionView( + PCollection<?> pCollection, TupleTag<Iterable<WindowedValue<?>>> tag, ViewFn<Iterable<WindowedValue<?>>, T> viewFn, WindowMappingFn<?> windowMappingFn, @Nullable WindowingStrategy<?, ?> windowingStrategy, @Nullable Coder<Iterable<WindowedValue<?>>> coder) { + this.pCollection = pCollection; this.tag = tag; this.viewFn = viewFn; this.windowMappingFn = windowMappingFn; @@ -56,11 +59,9 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T> this.coder = coder; } - @Nullable @Override public PCollection<?> getPCollection() { - throw new IllegalStateException( - String.format("Cannot call getPCollection on a %s", getClass().getSimpleName())); + return pCollection; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/bdece9d2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java index a8490bf..6fdf9d6 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java @@ -23,9 +23,9 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; -import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -143,22 +143,30 @@ public class ParDoTranslationTest { inputs.putAll(parDo.getAdditionalInputs()); PCollectionTuple output = mainInput.apply(parDo); - SdkComponents components = SdkComponents.create(); - String transformId = - components.registerPTransform( + SdkComponents sdkComponents = SdkComponents.create(); + + // Encode + RunnerApi.PTransform protoTransform = + PTransformTranslation.toProto( AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of( "foo", inputs, output.expand(), parDo, p), - Collections.<AppliedPTransform<?, ?, ?>>emptyList()); + sdkComponents); + Components protoComponents = sdkComponents.toComponents(); + + // Decode + Pipeline rehydratedPipeline = Pipeline.create(); - Components protoComponents = components.toComponents(); - RunnerApi.PTransform protoTransform = protoComponents.getTransformsOrThrow(transformId); ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class); for (PCollectionView<?> view : parDo.getSideInputs()) { SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId()); PCollectionView<?> restoredView = - ParDoTranslation.fromProto( - sideInput, view.getTagInternal().getId(), protoTransform, protoComponents); + ParDoTranslation.viewFromProto( + rehydratedPipeline, + sideInput, + view.getTagInternal().getId(), + protoTransform, + protoComponents); assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal())); assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass())); assertThat( @@ -169,7 +177,7 @@ public class ParDoTranslationTest { view.getWindowingStrategyInternal().fixDefaults())); assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal())); } - String mainInputId = components.registerPCollection(mainInput); + String mainInputId = sdkComponents.registerPCollection(mainInput); assertThat( ParDoTranslation.getMainInput(protoTransform, protoComponents), equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
