Repository: beam
Updated Branches:
  refs/heads/master c14a3184e -> b6b1c8b7c


Add more utilities to ParDoTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/165dfa68
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/165dfa68
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/165dfa68

Branch: refs/heads/master
Commit: 165dfa688beaeb2de9b5041c81f6e02b517f74fd
Parents: 20ce075
Author: Kenneth Knowles <[email protected]>
Authored: Thu Jun 8 13:46:18 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Mon Jul 10 20:04:14 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     | 48 ++++++++++++++++++++
 1 file changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/165dfa68/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 34e0d86..5f2bcae 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -34,9 +34,11 @@ import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -74,6 +76,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
@@ -215,11 +218,56 @@ public class ParDoTranslation {
     return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
   }
 
+  public static DoFn<?, ?> getDoFn(AppliedPTransform<?, ?, ?> application) 
throws IOException {
+    return getDoFn(getParDoPayload(application));
+  }
+
   public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
       throws InvalidProtocolBufferException {
     return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
   }
 
+  public static TupleTag<?> getMainOutputTag(AppliedPTransform<?, ?, ?> 
application)
+      throws IOException {
+    return getMainOutputTag(getParDoPayload(application));
+  }
+
+  public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, 
?> application)
+      throws IOException {
+
+    RunnerApi.PTransform protoTransform =
+        PTransformTranslation.toProto(application, SdkComponents.create());
+
+    ParDoPayload payload = 
protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
+    TupleTag<?> mainOutputTag = getMainOutputTag(payload);
+    Set<String> outputTags =
+        Sets.difference(
+            protoTransform.getOutputsMap().keySet(), 
Collections.singleton(mainOutputTag.getId()));
+
+    ArrayList<TupleTag<?>> additionalOutputTags = new ArrayList<>();
+    for (String outputTag : outputTags) {
+      additionalOutputTags.add(new TupleTag<>(outputTag));
+    }
+    return TupleTagList.of(additionalOutputTags);
+  }
+
+  public static List<PCollectionView<?>> getSideInputs(AppliedPTransform<?, ?, 
?> application)
+      throws IOException {
+
+    SdkComponents sdkComponents = SdkComponents.create();
+    RunnerApi.PTransform parDoProto =
+        PTransformTranslation.toProto(application, sdkComponents);
+    ParDoPayload payload = 
parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
+
+    List<PCollectionView<?>> views = new ArrayList<>();
+    for (Map.Entry<String, SideInput> sideInput : 
payload.getSideInputsMap().entrySet()) {
+      views.add(
+          fromProto(
+              sideInput.getValue(), sideInput.getKey(), parDoProto, 
sdkComponents.toComponents()));
+    }
+    return views;
+  }
+
   public static RunnerApi.PCollection getMainInput(
       RunnerApi.PTransform ptransform, Components components) throws 
IOException {
     checkArgument(

Reply via email to