Repository: beam
Updated Branches:
  refs/heads/master b4d870272 -> a6caa82a6


Add explicit translation builder for a Step to in Dataflow translator

Previously, there was always a "current" step that was the most recent
step created. This makes it cumbersome or impossible to do things like
translate one primitive transform into a small subgraph of steps. Thus
we added hacks like CreatePCollectionView which are not actually part
of the model at all - in fact, we should be able to add the needed
CollectionToSingleton steps simply by looking at the side inputs of a
ParDo node.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f04537cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f04537cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f04537cc

Branch: refs/heads/master
Commit: f04537ccbc2897ea4337941d5ca8121432daef43
Parents: b4d8702
Author: Kenneth Knowles <[email protected]>
Authored: Wed Dec 21 14:34:27 2016 -0800
Committer: Kenneth Knowles <[email protected]>
Committed: Fri Jan 6 11:24:49 2017 -0800

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 313 ++++++++++---------
 .../beam/runners/dataflow/DataflowRunner.java   |  60 ++--
 .../dataflow/internal/ReadTranslator.java       |   9 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   5 +-
 4 files changed, 196 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 8d2b076..2385fa1 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -213,14 +213,12 @@ public class DataflowPipelineTranslator {
   }
 
   /**
-   * A {@link TransformTranslator} knows how to translate
-   * a particular subclass of {@link PTransform} for the
-   * Cloud Dataflow service. It does so by
-   * mutating the {@link TranslationContext}.
+   * A {@link TransformTranslator} knows how to translate a particular 
subclass of {@link
+   * PTransform} for the Cloud Dataflow service. It does so by mutating the 
{@link
+   * TranslationContext}.
    */
   public interface TransformTranslator<TransformT extends PTransform> {
-    void translate(TransformT transform,
-                          TranslationContext context);
+    void translate(TransformT transform, TranslationContext context);
   }
 
   /**
@@ -252,10 +250,8 @@ public class DataflowPipelineTranslator {
     /**
      * Adds a step to the Dataflow workflow for the given transform, with
      * the given Dataflow step type.
-     * This step becomes "current" for the purpose of {@link #addInput} and
-     * {@link #addOutput}.
      */
-    void addStep(PTransform<?, ?> transform, String type);
+    StepTranslationContext addStep(PTransform<?, ?> transform, String type);
 
     /**
      * Adds a pre-defined step to the Dataflow workflow. The given PTransform 
should be
@@ -264,8 +260,14 @@ public class DataflowPipelineTranslator {
      * <p>This is a low-level operation, when using this method it is up to
      * the caller to ensure that names do not collide.
      */
-    void addStep(PTransform<?, ? extends PValue> transform, Step step);
+    Step addStep(PTransform<?, ? extends PValue> transform, Step step);
+    /**
+     * Encode a PValue reference as an output reference.
+     */
+    OutputReference asOutputReference(PValue value);
+  }
 
+  public interface StepTranslationContext {
     /**
      * Sets the encoding for the current Dataflow step.
      */
@@ -330,12 +332,7 @@ public class DataflowPipelineTranslator {
      * output encoding.  Returns a pipeline level unique id.
      */
     long addCollectionToSingletonOutput(PValue inputValue,
-                                               PValue outputValue);
-
-    /**
-     * Encode a PValue reference as an output reference.
-     */
-    OutputReference asOutputReference(PValue value);
+        PValue outputValue);
   }
 
 
@@ -343,6 +340,8 @@ public class DataflowPipelineTranslator {
 
   /**
    * Translates a Pipeline into the Dataflow representation.
+   *
+   * <p>For internal use only.
    */
   class Translator extends PipelineVisitor.Defaults implements 
TranslationContext {
     /**
@@ -368,11 +367,6 @@ public class DataflowPipelineTranslator {
     private final Job job = new Job();
 
     /**
-     * Translator is stateful, as addProperty calls refer to the current step.
-     */
-    private Step currentStep;
-
-    /**
      * A Map from AppliedPTransform to their unique Dataflow step names.
      */
     private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new 
HashMap<>();
@@ -546,7 +540,7 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public void addStep(PTransform<?, ?> transform, String type) {
+    public StepTranslator addStep(PTransform<?, ?> transform, String type) {
       String stepName = genStepName();
       if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
         throw new IllegalArgumentException(
@@ -559,16 +553,19 @@ public class DataflowPipelineTranslator {
         job.setSteps(steps);
       }
 
-      currentStep = new Step();
-      currentStep.setName(stepName);
-      currentStep.setKind(type);
-      steps.add(currentStep);
-      addInput(PropertyNames.USER_NAME, getFullName(transform));
-      addDisplayData(stepName, transform);
+      Step step = new Step();
+      step.setName(stepName);
+      step.setKind(type);
+      steps.add(step);
+
+      StepTranslator stepContext = new StepTranslator(this, step);
+      stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform));
+      stepContext.addDisplayData(step, stepName, transform);
+      return stepContext;
     }
 
     @Override
-    public void addStep(PTransform<?, ? extends PValue> transform, Step 
original) {
+    public Step addStep(PTransform<?, ? extends PValue> transform, Step 
original) {
       Step step = original.clone();
       String stepName = step.getName();
       if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
@@ -605,8 +602,59 @@ public class DataflowPipelineTranslator {
         steps = new LinkedList<>();
         job.setSteps(steps);
       }
-      currentStep = step;
       steps.add(step);
+      return step;
+    }
+
+    @Override
+    public OutputReference asOutputReference(PValue value) {
+      AppliedPTransform<?, ?, ?> transform =
+          value.getProducingTransformInternal();
+      String stepName = stepNames.get(transform);
+      if (stepName == null) {
+        throw new IllegalArgumentException(transform + " doesn't have a name 
specified");
+      }
+
+      String outputName = outputNames.get(value);
+      if (outputName == null) {
+        throw new IllegalArgumentException(
+            "output " + value + " doesn't have a name specified");
+      }
+
+      return new OutputReference(stepName, outputName);
+    }
+
+    /**
+     * Returns a fresh Dataflow step name.
+     */
+    private String genStepName() {
+      return "s" + (stepNames.size() + 1);
+    }
+
+    /**
+     * Records the name of the given output PValue,
+     * within its producing transform.
+     */
+    private void registerOutputName(POutput value, String name) {
+      if (outputNames.put(value, name) != null) {
+        throw new IllegalArgumentException(
+            "output " + value + " already has a name specified");
+      }
+    }
+  }
+
+  static class StepTranslator implements StepTranslationContext {
+
+    private final Translator translator;
+    private final Step step;
+
+    private StepTranslator(Translator translator, Step step) {
+      this.translator = translator;
+      this.step = step;
+    }
+
+    private Map<String, Object> getProperties() {
+      return DataflowPipelineTranslator.getProperties(step);
     }
 
     @Override
@@ -643,7 +691,7 @@ public class DataflowPipelineTranslator {
     @Override
     public void addInput(String name, PInput value) {
       if (value instanceof PValue) {
-        addInput(name, asOutputReference((PValue) value));
+        addInput(name, translator.asOutputReference((PValue) value));
       } else {
         throw new IllegalStateException("Input must be a PValue");
       }
@@ -685,10 +733,10 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public long addCollectionToSingletonOutput(PValue inputValue,
-                                               PValue outputValue) {
+    public long addCollectionToSingletonOutput(
+        PValue inputValue, PValue outputValue) {
       Coder<?> inputValueCoder =
-          checkNotNull(outputCoders.get(inputValue));
+          checkNotNull(translator.outputCoders.get(inputValue));
       // The inputValueCoder for the input PCollection should be some
       // WindowedValueCoder of the input PCollection's element
       // coder.
@@ -707,8 +755,8 @@ public class DataflowPipelineTranslator {
      * with the given {@code Coder} (if not {@code null}).
      */
     private long addOutput(PValue value, Coder<?> valueCoder) {
-      long id = idGenerator.get();
-      registerOutputName(value, Long.toString(id));
+      long id = translator.idGenerator.get();
+      translator.registerOutputName(value, Long.toString(id));
 
       Map<String, Object> properties = getProperties();
       @Nullable List<Map<String, Object>> outputInfoList = null;
@@ -728,7 +776,7 @@ public class DataflowPipelineTranslator {
       addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id));
       addString(outputInfo, PropertyNames.USER_NAME, value.getName());
       if (value instanceof PCollection
-          && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) 
value)) {
+          && 
translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
         addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
       }
       if (valueCoder != null) {
@@ -736,63 +784,19 @@ public class DataflowPipelineTranslator {
         // failures as early as possible.
         CloudObject encoding = 
SerializableUtils.ensureSerializable(valueCoder);
         addObject(outputInfo, PropertyNames.ENCODING, encoding);
-        outputCoders.put(value, valueCoder);
+        translator.outputCoders.put(value, valueCoder);
       }
 
       outputInfoList.add(outputInfo);
       return id;
     }
 
-    private void addDisplayData(String stepName, HasDisplayData 
hasDisplayData) {
+    private void addDisplayData(Step step, String stepName, HasDisplayData 
hasDisplayData) {
       DisplayData displayData = DisplayData.from(hasDisplayData);
       List<Map<String, Object>> list = MAPPER.convertValue(displayData, 
List.class);
       addList(getProperties(), PropertyNames.DISPLAY_DATA, list);
     }
 
-    @Override
-    public OutputReference asOutputReference(PValue value) {
-      AppliedPTransform<?, ?, ?> transform =
-          value.getProducingTransformInternal();
-      String stepName = stepNames.get(transform);
-      if (stepName == null) {
-        throw new IllegalArgumentException(transform + " doesn't have a name 
specified");
-      }
-
-      String outputName = outputNames.get(value);
-      if (outputName == null) {
-        throw new IllegalArgumentException(
-            "output " + value + " doesn't have a name specified");
-      }
-
-      return new OutputReference(stepName, outputName);
-    }
-
-    private Map<String, Object> getProperties() {
-      Map<String, Object> properties = currentStep.getProperties();
-      if (properties == null) {
-        properties = new HashMap<>();
-        currentStep.setProperties(properties);
-      }
-      return properties;
-    }
-
-    /**
-     * Returns a fresh Dataflow step name.
-     */
-    private String genStepName() {
-      return "s" + (stepNames.size() + 1);
-    }
-
-    /**
-     * Records the name of the given output PValue,
-     * within its producing transform.
-     */
-    private void registerOutputName(POutput value, String name) {
-      if (outputNames.put(value, name) != null) {
-        throw new IllegalArgumentException(
-            "output " + value + " already has a name specified");
-      }
-    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -802,6 +806,14 @@ public class DataflowPipelineTranslator {
     return "DataflowPipelineTranslator#" + hashCode();
   }
 
+  private static Map<String, Object> getProperties(Step step) {
+    Map<String, Object> properties = step.getProperties();
+    if (properties == null) {
+      properties = new HashMap<>();
+      step.setProperties(properties);
+    }
+    return properties;
+  }
 
   ///////////////////////////////////////////////////////////////////////////
 
@@ -810,20 +822,17 @@ public class DataflowPipelineTranslator {
         View.CreatePCollectionView.class,
         new TransformTranslator<View.CreatePCollectionView>() {
           @Override
-          public void translate(
-              View.CreatePCollectionView transform,
-              TranslationContext context) {
+          public void translate(View.CreatePCollectionView transform, 
TranslationContext context) {
             translateTyped(transform, context);
           }
 
           private <ElemT, ViewT> void translateTyped(
-              View.CreatePCollectionView<ElemT, ViewT> transform,
-              TranslationContext context) {
-            context.addStep(transform, "CollectionToSingleton");
-            context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-            context.addCollectionToSingletonOutput(
-                context.getInput(transform),
-                context.getOutput(transform));
+              View.CreatePCollectionView<ElemT, ViewT> transform, 
TranslationContext context) {
+            StepTranslationContext stepContext =
+                context.addStep(transform, "CollectionToSingleton");
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
+            stepContext.addCollectionToSingletonOutput(
+                context.getInput(transform), context.getOutput(transform));
           }
         });
 
@@ -839,21 +848,21 @@ public class DataflowPipelineTranslator {
 
           private <K, InputT, OutputT> void translateHelper(
               final Combine.GroupedValues<K, InputT, OutputT> transform,
-              DataflowPipelineTranslator.TranslationContext context) {
-            context.addStep(transform, "CombineValues");
-            translateInputs(context.getInput(transform), 
transform.getSideInputs(), context);
+              TranslationContext context) {
+            StepTranslationContext stepContext = context.addStep(transform, 
"CombineValues");
+            translateInputs(
+                stepContext, context.getInput(transform), 
transform.getSideInputs(), context);
 
             AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
                 transform.getAppliedFn(
                     
context.getInput(transform).getPipeline().getCoderRegistry(),
-                context.getInput(transform).getCoder(),
-                context.getInput(transform).getWindowingStrategy());
+                    context.getInput(transform).getCoder(),
+                    context.getInput(transform).getWindowingStrategy());
 
-            context.addEncodingInput(fn.getAccumulatorCoder());
-            context.addInput(
-                PropertyNames.SERIALIZED_FN,
-                byteArrayToJsonString(serializeToByteArray(fn)));
-            context.addOutput(context.getOutput(transform));
+            stepContext.addEncodingInput(fn.getAccumulatorCoder());
+            stepContext.addInput(
+                PropertyNames.SERIALIZED_FN, 
byteArrayToJsonString(serializeToByteArray(fn)));
+            stepContext.addOutput(context.getOutput(transform));
           }
         });
 
@@ -870,14 +879,14 @@ public class DataflowPipelineTranslator {
           private <T> void flattenHelper(
               Flatten.FlattenPCollectionList<T> transform,
               TranslationContext context) {
-            context.addStep(transform, "Flatten");
+            StepTranslationContext stepContext = context.addStep(transform, 
"Flatten");
 
             List<OutputReference> inputs = new LinkedList<>();
             for (PCollection<T> input : context.getInput(transform).getAll()) {
               inputs.add(context.asOutputReference(input));
             }
-            context.addInput(PropertyNames.INPUTS, inputs);
-            context.addOutput(context.getOutput(transform));
+            stepContext.addInput(PropertyNames.INPUTS, inputs);
+            stepContext.addOutput(context.getOutput(transform));
           }
         });
 
@@ -885,23 +894,19 @@ public class DataflowPipelineTranslator {
         GroupByKeyAndSortValuesOnly.class,
         new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
           @Override
-          public void translate(
-              GroupByKeyAndSortValuesOnly transform,
-              TranslationContext context) {
+          public void translate(GroupByKeyAndSortValuesOnly transform, 
TranslationContext context) {
             groupByKeyAndSortValuesHelper(transform, context);
           }
 
           private <K1, K2, V> void groupByKeyAndSortValuesHelper(
-              GroupByKeyAndSortValuesOnly<K1, K2, V> transform,
-              TranslationContext context) {
-            context.addStep(transform, "GroupByKey");
-            context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-            context.addOutput(context.getOutput(transform));
-            context.addInput(PropertyNames.SORT_VALUES, true);
+              GroupByKeyAndSortValuesOnly<K1, K2, V> transform, 
TranslationContext context) {
+            StepTranslationContext stepContext = context.addStep(transform, 
"GroupByKey");
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
+            stepContext.addOutput(context.getOutput(transform));
+            stepContext.addInput(PropertyNames.SORT_VALUES, true);
 
             // TODO: Add support for combiner lifting once the need arises.
-            context.addInput(
-                PropertyNames.DISALLOW_COMBINER_LIFTING, true);
+            stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, 
true);
           }
         });
 
@@ -918,9 +923,9 @@ public class DataflowPipelineTranslator {
           private <K, V> void groupByKeyHelper(
               GroupByKey<K, V> transform,
               TranslationContext context) {
-            context.addStep(transform, "GroupByKey");
-            context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-            context.addOutput(context.getOutput(transform));
+            StepTranslationContext stepContext = context.addStep(transform, 
"GroupByKey");
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
+            stepContext.addOutput(context.getOutput(transform));
 
             WindowingStrategy<?, ?> windowingStrategy =
                 context.getInput(transform).getWindowingStrategy();
@@ -931,12 +936,12 @@ public class DataflowPipelineTranslator {
                 || (isStreaming && !transform.fewKeys())
                 // TODO: Allow combiner lifting on the non-default trigger, as 
appropriate.
                 || !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
-            context.addInput(
+            stepContext.addInput(
                 PropertyNames.DISALLOW_COMBINER_LIFTING, 
disallowCombinerLifting);
-            context.addInput(
+            stepContext.addInput(
                 PropertyNames.SERIALIZED_FN,
                 
byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
-            context.addInput(
+            stepContext.addInput(
                 PropertyNames.IS_MERGING_WINDOW_FN,
                 !windowingStrategy.getWindowFn().isNonMerging());
           }
@@ -946,22 +951,21 @@ public class DataflowPipelineTranslator {
         ParDo.BoundMulti.class,
         new TransformTranslator<ParDo.BoundMulti>() {
           @Override
-          public void translate(
-              ParDo.BoundMulti transform,
-              TranslationContext context) {
+          public void translate(ParDo.BoundMulti transform, TranslationContext 
context) {
             translateMultiHelper(transform, context);
           }
 
           private <InputT, OutputT> void translateMultiHelper(
-              ParDo.BoundMulti<InputT, OutputT> transform,
-              TranslationContext context) {
-            rejectStatefulDoFn(transform.getFn());
+              ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext 
context) {
+            DataflowPipelineTranslator.rejectStatefulDoFn(transform.getFn());
 
-            context.addStep(transform, "ParallelDo");
-            translateInputs(context.getInput(transform), 
transform.getSideInputs(), context);
+            StepTranslationContext stepContext = context.addStep(transform, 
"ParallelDo");
+            translateInputs(
+                stepContext, context.getInput(transform), 
transform.getSideInputs(), context);
             BiMap<Long, TupleTag<?>> outputMap =
-                translateOutputs(context.getOutput(transform), context);
+                translateOutputs(context.getOutput(transform), stepContext);
             translateFn(
+                stepContext,
                 transform.getFn(),
                 context.getInput(transform).getWindowingStrategy(),
                 transform.getSideInputs(),
@@ -976,30 +980,28 @@ public class DataflowPipelineTranslator {
         ParDo.Bound.class,
         new TransformTranslator<ParDo.Bound>() {
           @Override
-          public void translate(
-              ParDo.Bound transform,
-              TranslationContext context) {
+          public void translate(ParDo.Bound transform, TranslationContext 
context) {
             translateSingleHelper(transform, context);
           }
 
           private <InputT, OutputT> void translateSingleHelper(
-              ParDo.Bound<InputT, OutputT> transform,
-              TranslationContext context) {
+              ParDo.Bound<InputT, OutputT> transform, TranslationContext 
context) {
             rejectStatefulDoFn(transform.getFn());
 
-            context.addStep(transform, "ParallelDo");
-            translateInputs(context.getInput(transform), 
transform.getSideInputs(), context);
-            long mainOutput = context.addOutput(context.getOutput(transform));
+            StepTranslationContext stepContext = context.addStep(transform, 
"ParallelDo");
+            translateInputs(
+                stepContext, context.getInput(transform), 
transform.getSideInputs(), context);
+            long mainOutput = 
stepContext.addOutput(context.getOutput(transform));
             translateFn(
+                stepContext,
                 transform.getFn(),
                 context.getInput(transform).getWindowingStrategy(),
                 transform.getSideInputs(),
                 context.getInput(transform).getCoder(),
                 context,
                 mainOutput,
-                ImmutableMap.<Long, TupleTag<?>>of(mainOutput,
-                  new TupleTag<>(PropertyNames.OUTPUT)));
-
+                ImmutableMap.<Long, TupleTag<?>>of(
+                    mainOutput, new TupleTag<>(PropertyNames.OUTPUT)));
           }
         });
 
@@ -1014,16 +1016,16 @@ public class DataflowPipelineTranslator {
 
           private <T> void translateHelper(
               Window.Bound<T> transform, TranslationContext context) {
-            context.addStep(transform, "Bucket");
-            context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-            context.addOutput(context.getOutput(transform));
+            StepTranslationContext stepContext = context.addStep(transform, 
"Bucket");
+            stepContext.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
+            stepContext.addOutput(context.getOutput(transform));
 
             WindowingStrategy<?, ?> strategy = 
context.getOutput(transform).getWindowingStrategy();
             byte[] serializedBytes = serializeToByteArray(strategy);
             String serializedJson = byteArrayToJsonString(serializedBytes);
             assert Arrays.equals(serializedBytes,
                                  jsonStringToByteArray(serializedJson));
-            context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
+            stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
           }
         });
 
@@ -1046,15 +1048,17 @@ public class DataflowPipelineTranslator {
   }
 
   private static void translateInputs(
+      StepTranslationContext stepContext,
       PCollection<?> input,
       List<PCollectionView<?>> sideInputs,
       TranslationContext context) {
-    context.addInput(PropertyNames.PARALLEL_INPUT, input);
-    translateSideInputs(sideInputs, context);
+    stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
+    translateSideInputs(stepContext, sideInputs, context);
   }
 
   // Used for ParDo
   private static void translateSideInputs(
+      StepTranslationContext stepContext,
       List<PCollectionView<?>> sideInputs,
       TranslationContext context) {
     Map<String, Object> nonParInputs = new HashMap<>();
@@ -1065,10 +1069,11 @@ public class DataflowPipelineTranslator {
           context.asOutputReference(view));
     }
 
-    context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
+    stepContext.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
   }
 
   private static void translateFn(
+      StepTranslationContext stepContext,
       DoFn fn,
       WindowingStrategy windowingStrategy,
       Iterable<PCollectionView<?>> sideInputs,
@@ -1076,8 +1081,8 @@ public class DataflowPipelineTranslator {
       TranslationContext context,
       long mainOutput,
       Map<Long, TupleTag<?>> outputMap) {
-    context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
-    context.addInput(
+    stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
+    stepContext.addInput(
         PropertyNames.SERIALIZED_FN,
         byteArrayToJsonString(
             serializeToByteArray(
@@ -1087,13 +1092,13 @@ public class DataflowPipelineTranslator {
 
   private static BiMap<Long, TupleTag<?>> translateOutputs(
       PCollectionTuple outputs,
-      TranslationContext context) {
+      StepTranslationContext stepContext) {
     ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = 
ImmutableBiMap.builder();
     for (Map.Entry<TupleTag<?>, PCollection<?>> entry
              : outputs.getAll().entrySet()) {
       TupleTag<?> tag = entry.getKey();
       PCollection<?> output = entry.getValue();
-      mapBuilder.put(context.addOutput(output), tag);
+      mapBuilder.put(stepContext.addOutput(output), tag);
     }
     return mapBuilder.build();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 03e5dfc..d2c1e66 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -33,6 +33,7 @@ import 
com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.api.services.dataflow.model.Step;
 import com.google.api.services.dataflow.model.WorkerPool;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -72,6 +73,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
+import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
 import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
 import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.internal.AssignWindows;
@@ -2116,50 +2118,46 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  /**
-   * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node.
-   */
-  private static class StreamingPubsubIOReadTranslator<T> implements
-      TransformTranslator<StreamingPubsubIORead<T>> {
+  /** Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. 
*/
+  private static class StreamingPubsubIOReadTranslator<T>
+      implements TransformTranslator<StreamingPubsubIORead<T>> {
     @Override
-    public void translate(
-        StreamingPubsubIORead<T> transform,
-        TranslationContext context) {
-      checkArgument(context.getPipelineOptions().isStreaming(),
-                    "StreamingPubsubIORead is only for streaming pipelines.");
+    public void translate(StreamingPubsubIORead<T> transform, 
TranslationContext context) {
+      checkArgument(
+          context.getPipelineOptions().isStreaming(),
+          "StreamingPubsubIORead is only for streaming pipelines.");
       PubsubUnboundedSource<T> overriddenTransform = 
transform.getOverriddenTransform();
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
+      StepTranslationContext stepContext = context.addStep(transform, 
"ParallelRead");
+      stepContext.addInput(PropertyNames.FORMAT, "pubsub");
       if (overriddenTransform.getTopicProvider() != null) {
         if (overriddenTransform.getTopicProvider().isAccessible()) {
-          context.addInput(
+          stepContext.addInput(
               PropertyNames.PUBSUB_TOPIC, 
overriddenTransform.getTopic().getV1Beta1Path());
         } else {
-          context.addInput(
+          stepContext.addInput(
               PropertyNames.PUBSUB_TOPIC_OVERRIDE,
               ((NestedValueProvider) 
overriddenTransform.getTopicProvider()).propertyName());
         }
       }
       if (overriddenTransform.getSubscriptionProvider() != null) {
         if (overriddenTransform.getSubscriptionProvider().isAccessible()) {
-          context.addInput(
+          stepContext.addInput(
               PropertyNames.PUBSUB_SUBSCRIPTION,
               overriddenTransform.getSubscription().getV1Beta1Path());
         } else {
-          context.addInput(
+          stepContext.addInput(
               PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE,
-              ((NestedValueProvider) 
overriddenTransform.getSubscriptionProvider())
-              .propertyName());
+              ((NestedValueProvider) 
overriddenTransform.getSubscriptionProvider()).propertyName());
         }
       }
       if (overriddenTransform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
-                         overriddenTransform.getTimestampLabel());
+        stepContext.addInput(
+            PropertyNames.PUBSUB_TIMESTAMP_LABEL, 
overriddenTransform.getTimestampLabel());
       }
       if (overriddenTransform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, 
overriddenTransform.getIdLabel());
+        stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, 
overriddenTransform.getIdLabel());
       }
-      context.addValueOnlyOutput(context.getOutput(transform));
+      stepContext.addValueOnlyOutput(context.getOutput(transform));
     }
   }
 
@@ -2211,26 +2209,26 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       checkArgument(context.getPipelineOptions().isStreaming(),
                     "StreamingPubsubIOWrite is only for streaming pipelines.");
       PubsubUnboundedSink<T> overriddenTransform = 
transform.getOverriddenTransform();
-      context.addStep(transform, "ParallelWrite");
-      context.addInput(PropertyNames.FORMAT, "pubsub");
+      StepTranslationContext stepContext = context.addStep(transform, 
"ParallelWrite");
+      stepContext.addInput(PropertyNames.FORMAT, "pubsub");
       if (overriddenTransform.getTopicProvider().isAccessible()) {
-        context.addInput(
+        stepContext.addInput(
             PropertyNames.PUBSUB_TOPIC, 
overriddenTransform.getTopic().getV1Beta1Path());
       } else {
-        context.addInput(
+        stepContext.addInput(
             PropertyNames.PUBSUB_TOPIC_OVERRIDE,
             ((NestedValueProvider) 
overriddenTransform.getTopicProvider()).propertyName());
       }
       if (overriddenTransform.getTimestampLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
-                         overriddenTransform.getTimestampLabel());
+        stepContext.addInput(
+            PropertyNames.PUBSUB_TIMESTAMP_LABEL, 
overriddenTransform.getTimestampLabel());
       }
       if (overriddenTransform.getIdLabel() != null) {
-        context.addInput(PropertyNames.PUBSUB_ID_LABEL, 
overriddenTransform.getIdLabel());
+        stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, 
overriddenTransform.getIdLabel());
       }
-      context.addEncodingInput(
+      stepContext.addEncodingInput(
           
WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder()));
-      context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
+      stepContext.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 84950f7..1a5a9a5 100755
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -25,6 +25,7 @@ import com.google.api.services.dataflow.model.SourceMetadata;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
+import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
 import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
 import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.sdk.io.FileBasedSource;
@@ -60,13 +61,13 @@ public class ReadTranslator implements 
TransformTranslator<Read.Bounded<?>> {
         }
       }
 
-      context.addStep(transform, "ParallelRead");
-      context.addInput(PropertyNames.FORMAT, 
PropertyNames.CUSTOM_SOURCE_FORMAT);
-      context.addInput(
+      StepTranslationContext stepContext = context.addStep(transform, 
"ParallelRead");
+      stepContext.addInput(PropertyNames.FORMAT, 
PropertyNames.CUSTOM_SOURCE_FORMAT);
+      stepContext.addInput(
           PropertyNames.SOURCE_STEP_INPUT,
           cloudSourceToDictionary(
               CustomSources.serializeToCloudSource(source, 
context.getPipelineOptions())));
-      context.addValueOnlyOutput(context.getOutput(transform));
+      stepContext.addValueOnlyOutput(context.getOutput(transform));
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 21d575a..a19fd8c 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -61,6 +61,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
+import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap;
 import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap;
@@ -998,8 +999,8 @@ public class DataflowRunnerTest {
 
             // Note: This is about the minimum needed to fake out a
             // translation. This obviously isn't a real translation.
-            context.addStep(transform, "TestTranslate");
-            context.addOutput(context.getOutput(transform));
+            StepTranslationContext stepContext = context.addStep(transform, 
"TestTranslate");
+            stepContext.addOutput(context.getOutput(transform));
           }
         });
 

Reply via email to