Repository: beam Updated Branches: refs/heads/master c14a3184e -> b6b1c8b7c
Add more utilities to ParDoTranslation Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/165dfa68 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/165dfa68 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/165dfa68 Branch: refs/heads/master Commit: 165dfa688beaeb2de9b5041c81f6e02b517f74fd Parents: 20ce075 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 8 13:46:18 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Jul 10 20:04:14 2017 -0700 ---------------------------------------------------------------------- .../core/construction/ParDoTranslation.java | 48 ++++++++++++++++++++ 1 file changed, 48 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/165dfa68/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index 34e0d86..5f2bcae 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -34,9 +34,11 @@ import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -74,6 +76,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; /** @@ -215,11 +218,56 @@ public class ParDoTranslation { return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn(); } + public static DoFn<?, ?> getDoFn(AppliedPTransform<?, ?, ?> application) throws IOException { + return getDoFn(getParDoPayload(application)); + } + public static TupleTag<?> getMainOutputTag(ParDoPayload payload) throws InvalidProtocolBufferException { return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag(); } + public static TupleTag<?> getMainOutputTag(AppliedPTransform<?, ?, ?> application) + throws IOException { + return getMainOutputTag(getParDoPayload(application)); + } + + public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, ?> application) + throws IOException { + + RunnerApi.PTransform protoTransform = + PTransformTranslation.toProto(application, SdkComponents.create()); + + ParDoPayload payload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class); + TupleTag<?> mainOutputTag = getMainOutputTag(payload); + Set<String> outputTags = + Sets.difference( + protoTransform.getOutputsMap().keySet(), Collections.singleton(mainOutputTag.getId())); + + ArrayList<TupleTag<?>> additionalOutputTags = new ArrayList<>(); + for (String outputTag : outputTags) { + additionalOutputTags.add(new TupleTag<>(outputTag)); + } + return TupleTagList.of(additionalOutputTags); + } + + public static List<PCollectionView<?>> getSideInputs(AppliedPTransform<?, ?, ?> application) + throws IOException { + + SdkComponents sdkComponents = SdkComponents.create(); + RunnerApi.PTransform parDoProto = + PTransformTranslation.toProto(application, sdkComponents); + ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class); + + List<PCollectionView<?>> views = new ArrayList<>(); + for (Map.Entry<String, SideInput> sideInput : payload.getSideInputsMap().entrySet()) { + views.add( + fromProto( + sideInput.getValue(), sideInput.getKey(), parDoProto, sdkComponents.toComponents())); + } + return views; + } + public static RunnerApi.PCollection getMainInput( RunnerApi.PTransform ptransform, Components components) throws IOException { checkArgument(
