Repository: incubator-beam
Updated Branches:
  refs/heads/master b8e6eea69 -> d69b324c4


Move the step output ids to use a flat namespace.
Also add a logical mapping from tuple tag to the flat namespace for DoFns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/17782007
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/17782007
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/17782007

Branch: refs/heads/master
Commit: 177820074d20e6ac72949f763f52cfb481904fc5
Parents: b8e6eea
Author: Luke Cwik <lc...@google.com>
Authored: Thu Oct 13 15:33:49 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 17 11:47:42 2016 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 111 ++++++++++++-------
 .../beam/runners/dataflow/DataflowRunner.java   |  12 +-
 .../dataflow/internal/ReadTranslator.java       |   2 +-
 .../beam/runners/dataflow/util/DoFnInfo.java    |  30 +++--
 .../DataflowPipelineTranslatorTest.java         |  38 ++++++-
 .../runners/dataflow/DataflowRunnerTest.java    |   6 +-
 6 files changed, 136 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 0d72881..c0366fc 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -41,6 +41,10 @@ import com.google.api.services.dataflow.model.Environment;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.Step;
 import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.common.base.Supplier;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,6 +53,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import 
org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
 import org.apache.beam.runners.dataflow.internal.ReadTranslator;
@@ -300,32 +305,30 @@ public class DataflowPipelineTranslator {
     public void addInput(String name, List<? extends Map<String, Object>> 
elements);
 
     /**
-     * Adds an output with the given name to the previously added
-     * Dataflow step, producing the specified output {@code PValue},
+     * Adds an output to the previously added Dataflow step,
+     * producing the specified output {@code PValue},
      * including its {@code Coder} if a {@code TypedPValue}.  If the
      * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code WindowedValueCoder}.
+     * a {@code WindowedValueCoder}.  Returns a pipeline level unique id.
      */
-    public void addOutput(String name, PValue value);
+    public long addOutput(PValue value);
 
     /**
-     * Adds an output with the given name to the previously added
-     * Dataflow step, producing the specified output {@code PValue},
+     * Adds an output to the previously added Dataflow step,
+     * producing the specified output {@code PValue},
      * including its {@code Coder} if a {@code TypedPValue}.  If the
      * {@code PValue} is a {@code PCollection}, wraps its coder inside
-     * a {@code ValueOnlyCoder}.
+     * a {@code ValueOnlyCoder}.  Returns a pipeline level unique id.
      */
-    public void addValueOnlyOutput(String name, PValue value);
+    public long addValueOnlyOutput(PValue value);
 
     /**
-     * Adds an output with the given name to the previously added
-     * CollectionToSingleton Dataflow step, consuming the specified
-     * input {@code PValue} and producing the specified output
+     * Adds an output to the previously added CollectionToSingleton Dataflow 
step,
+     * consuming the specified input {@code PValue} and producing the 
specified output
      * {@code PValue}.  This step requires special treatment for its
-     * output encoding.
+     * output encoding.  Returns a pipeline level unique id.
      */
-    public void addCollectionToSingletonOutput(String name,
-                                               PValue inputValue,
+    public long addCollectionToSingletonOutput(PValue inputValue,
                                                PValue outputValue);
 
     /**
@@ -341,6 +344,19 @@ public class DataflowPipelineTranslator {
    * Translates a Pipeline into the Dataflow representation.
    */
   class Translator extends PipelineVisitor.Defaults implements 
TranslationContext {
+    /**
+     * An id generator to be used when giving unique ids for pipeline level 
constructs.
+     * This is purposely wrapped inside of a {@link Supplier} to prevent the 
incorrect
+     * usage of the {@link AtomicLong} that is contained.
+     */
+    private final Supplier<Long> idGenerator = new Supplier<Long>() {
+      private final AtomicLong generator = new AtomicLong(1L);
+      @Override
+      public Long get() {
+        return generator.getAndIncrement();
+      }
+    };
+
     /** The Pipeline to translate. */
     private final Pipeline pipeline;
 
@@ -634,7 +650,7 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public void addOutput(String name, PValue value) {
+    public long addOutput(PValue value) {
       Coder<?> coder;
       if (value instanceof TypedPValue) {
         coder = ((TypedPValue<?>) value).getCoder();
@@ -648,11 +664,11 @@ public class DataflowPipelineTranslator {
         // No output coder to encode.
         coder = null;
       }
-      addOutput(name, value, coder);
+      return addOutput(value, coder);
     }
 
     @Override
-    public void addValueOnlyOutput(String name, PValue value) {
+    public long addValueOnlyOutput(PValue value) {
       Coder<?> coder;
       if (value instanceof TypedPValue) {
         coder = ((TypedPValue<?>) value).getCoder();
@@ -665,12 +681,11 @@ public class DataflowPipelineTranslator {
         // No output coder to encode.
         coder = null;
       }
-      addOutput(name, value, coder);
+      return addOutput(value, coder);
     }
 
     @Override
-    public void addCollectionToSingletonOutput(String name,
-                                               PValue inputValue,
+    public long addCollectionToSingletonOutput(PValue inputValue,
                                                PValue outputValue) {
       Coder<?> inputValueCoder =
           checkNotNull(outputCoders.get(inputValue));
@@ -683,7 +698,7 @@ public class DataflowPipelineTranslator {
       // IterableCoder of the inputValueCoder. This is a property
       // of the backend "CollectionToSingleton" step.
       Coder<?> outputValueCoder = IterableCoder.of(inputValueCoder);
-      addOutput(name, outputValue, outputValueCoder);
+      return addOutput(outputValue, outputValueCoder);
     }
 
     /**
@@ -691,8 +706,9 @@ public class DataflowPipelineTranslator {
      * Dataflow step, producing the specified output {@code PValue}
      * with the given {@code Coder} (if not {@code null}).
      */
-    private void addOutput(String name, PValue value, Coder<?> valueCoder) {
-      registerOutputName(value, name);
+    private long addOutput(PValue value, Coder<?> valueCoder) {
+      long id = idGenerator.get();
+      registerOutputName(value, Long.toString(id));
 
       Map<String, Object> properties = getProperties();
       @Nullable List<Map<String, Object>> outputInfoList = null;
@@ -709,7 +725,7 @@ public class DataflowPipelineTranslator {
       }
 
       Map<String, Object> outputInfo = new HashMap<>();
-      addString(outputInfo, PropertyNames.OUTPUT_NAME, name);
+      addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id));
       addString(outputInfo, PropertyNames.USER_NAME, value.getName());
       if (value instanceof PCollection
           && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) 
value)) {
@@ -724,6 +740,7 @@ public class DataflowPipelineTranslator {
       }
 
       outputInfoList.add(outputInfo);
+      return id;
     }
 
     private void addDisplayData(String stepName, HasDisplayData 
hasDisplayData) {
@@ -805,7 +822,6 @@ public class DataflowPipelineTranslator {
             context.addStep(transform, "CollectionToSingleton");
             context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
             context.addCollectionToSingletonOutput(
-                PropertyNames.OUTPUT,
                 context.getInput(transform),
                 context.getOutput(transform));
           }
@@ -837,7 +853,7 @@ public class DataflowPipelineTranslator {
             context.addInput(
                 PropertyNames.SERIALIZED_FN,
                 byteArrayToJsonString(serializeToByteArray(fn)));
-            context.addOutput(PropertyNames.OUTPUT, 
context.getOutput(transform));
+            context.addOutput(context.getOutput(transform));
           }
         });
 
@@ -861,7 +877,7 @@ public class DataflowPipelineTranslator {
               inputs.add(context.asOutputReference(input));
             }
             context.addInput(PropertyNames.INPUTS, inputs);
-            context.addOutput(PropertyNames.OUTPUT, 
context.getOutput(transform));
+            context.addOutput(context.getOutput(transform));
           }
         });
 
@@ -880,7 +896,7 @@ public class DataflowPipelineTranslator {
               TranslationContext context) {
             context.addStep(transform, "GroupByKey");
             context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-            context.addOutput(PropertyNames.OUTPUT, 
context.getOutput(transform));
+            context.addOutput(context.getOutput(transform));
             context.addInput(PropertyNames.SORT_VALUES, true);
 
             // TODO: Add support for combiner lifting once the need arises.
@@ -904,7 +920,7 @@ public class DataflowPipelineTranslator {
               TranslationContext context) {
             context.addStep(transform, "GroupByKey");
             context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-            context.addOutput(PropertyNames.OUTPUT, 
context.getOutput(transform));
+            context.addOutput(context.getOutput(transform));
 
             WindowingStrategy<?, ?> windowingStrategy =
                 context.getInput(transform).getWindowingStrategy();
@@ -941,9 +957,16 @@ public class DataflowPipelineTranslator {
               TranslationContext context) {
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), 
transform.getSideInputs(), context);
-            translateFn(transform.getFn(), 
context.getInput(transform).getWindowingStrategy(),
-                transform.getSideInputs(), 
context.getInput(transform).getCoder(), context);
-            translateOutputs(context.getOutput(transform), context);
+            BiMap<Long, TupleTag<?>> outputMap =
+                translateOutputs(context.getOutput(transform), context);
+            translateFn(
+                transform.getFn(),
+                context.getInput(transform).getWindowingStrategy(),
+                transform.getSideInputs(),
+                context.getInput(transform).getCoder(),
+                context,
+                outputMap.inverse().get(transform.getMainOutputTag()),
+                outputMap);
           }
         });
 
@@ -962,11 +985,17 @@ public class DataflowPipelineTranslator {
               TranslationContext context) {
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), 
transform.getSideInputs(), context);
+            long mainOutput = context.addOutput(context.getOutput(transform));
             translateFn(
                 transform.getFn(),
                 context.getInput(transform).getWindowingStrategy(),
-                transform.getSideInputs(), 
context.getInput(transform).getCoder(), context);
-            context.addOutput(PropertyNames.OUTPUT, 
context.getOutput(transform));
+                transform.getSideInputs(),
+                context.getInput(transform).getCoder(),
+                context,
+                mainOutput,
+                ImmutableMap.<Long, TupleTag<?>>of(mainOutput,
+                  new TupleTag<>(PropertyNames.OUTPUT)));
+
           }
         });
 
@@ -983,7 +1012,7 @@ public class DataflowPipelineTranslator {
               Window.Bound<T> transform, TranslationContext context) {
             context.addStep(transform, "Bucket");
             context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-            context.addOutput(PropertyNames.OUTPUT, 
context.getOutput(transform));
+            context.addOutput(context.getOutput(transform));
 
             WindowingStrategy<?, ?> strategy = 
context.getOutput(transform).getWindowingStrategy();
             byte[] serializedBytes = serializeToByteArray(strategy);
@@ -1028,22 +1057,26 @@ public class DataflowPipelineTranslator {
       WindowingStrategy windowingStrategy,
       Iterable<PCollectionView<?>> sideInputs,
       Coder inputCoder,
-      TranslationContext context) {
+      TranslationContext context,
+      long mainOutput,
+      Map<Long, TupleTag<?>> outputMap) {
     context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
     context.addInput(
         PropertyNames.SERIALIZED_FN,
         byteArrayToJsonString(serializeToByteArray(
-            new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder))));
+            new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder, 
mainOutput, outputMap))));
   }
 
-  private static void translateOutputs(
+  private static BiMap<Long, TupleTag<?>> translateOutputs(
       PCollectionTuple outputs,
       TranslationContext context) {
+    ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = 
ImmutableBiMap.builder();
     for (Map.Entry<TupleTag<?>, PCollection<?>> entry
              : outputs.getAll().entrySet()) {
       TupleTag<?> tag = entry.getKey();
       PCollection<?> output = entry.getValue();
-      context.addOutput(tag.getId(), output);
+      mapBuilder.put(context.addOutput(output), tag);
     }
+    return mapBuilder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 646a145..55a01f7 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1824,7 +1824,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       // outputting to all the outputs defined above.
       PCollectionTuple outputTuple = input
            .apply("GBKaSVForData", new GroupByKeyHashAndSortByKeyAndWindow<K, 
V, W>(ismCoder))
-           .apply(ParDo.of(new ToIsmRecordForMapLikeDoFn<K, V, W>(
+           .apply(ParDo.of(new ToIsmRecordForMapLikeDoFn<>(
                    outputForSizeTag, outputForEntrySetTag,
                    windowCoder, inputCoder.getKeyCoder(), ismCoder, 
uniqueKeysExpected))
                        .withOutputTags(mainOutputTag,
@@ -2116,7 +2116,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       if (overriddenTransform.getIdLabel() != null) {
         context.addInput(PropertyNames.PUBSUB_ID_LABEL, 
overriddenTransform.getIdLabel());
       }
-      context.addValueOnlyOutput(PropertyNames.OUTPUT, 
context.getOutput(transform));
+      context.addValueOnlyOutput(context.getOutput(transform));
     }
   }
 
@@ -2215,10 +2215,10 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       source.validate();
 
       if (source.requiresDeduping()) {
-        return Pipeline.applyTransform(input, new ReadWithIds<T>(source))
+        return Pipeline.applyTransform(input, new ReadWithIds<>(source))
             .apply(new Deduplicate<T>());
       } else {
-        return Pipeline.applyTransform(input, new ReadWithIds<T>(source))
+        return Pipeline.applyTransform(input, new ReadWithIds<>(source))
             .apply("StripIds", ParDo.of(new 
ValueWithRecordId.StripIdsDoFn<T>()));
       }
     }
@@ -2348,7 +2348,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
     public static <T> StreamingPCollectionViewWriterFn<T> create(
         PCollectionView<?> view, Coder<T> dataCoder) {
-      return new StreamingPCollectionViewWriterFn<T>(view, dataCoder);
+      return new StreamingPCollectionViewWriterFn<>(view, dataCoder);
     }
 
     private StreamingPCollectionViewWriterFn(PCollectionView<?> view, Coder<T> 
dataCoder) {
@@ -2648,7 +2648,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   private static class Concatenate<T> extends CombineFn<T, List<T>, List<T>> {
     @Override
     public List<T> createAccumulator() {
-      return new ArrayList<T>();
+      return new ArrayList<>();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 094f405..83836c0 100755
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -62,7 +62,7 @@ public class ReadTranslator implements 
TransformTranslator<Read.Bounded<?>> {
           PropertyNames.SOURCE_STEP_INPUT,
           cloudSourceToDictionary(
               CustomSources.serializeToCloudSource(source, 
context.getPipelineOptions())));
-      context.addValueOnlyOutput(PropertyNames.OUTPUT, 
context.getOutput(transform));
+      context.addValueOnlyOutput(context.getOutput(transform));
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index 949c381..b211c04 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -18,10 +18,12 @@
 package org.apache.beam.runners.dataflow.util;
 
 import java.io.Serializable;
+import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Wrapper class holding the necessary information to serialize a {@link 
OldDoFn}.
@@ -34,20 +36,21 @@ public class DoFnInfo<InputT, OutputT> implements 
Serializable {
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Iterable<PCollectionView<?>> sideInputViews;
   private final Coder<InputT> inputCoder;
+  private final long mainOutput;
+  private final Map<Long, TupleTag<?>> outputMap;
 
-  public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> 
windowingStrategy) {
-    this.doFn = doFn;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputViews = null;
-    this.inputCoder = null;
-  }
-
-  public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> 
windowingStrategy,
-                  Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> 
inputCoder) {
+  public DoFnInfo(OldDoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Iterable<PCollectionView<?>> sideInputViews,
+      Coder<InputT> inputCoder,
+      long mainOutput,
+      Map<Long, TupleTag<?>> outputMap) {
     this.doFn = doFn;
     this.windowingStrategy = windowingStrategy;
     this.sideInputViews = sideInputViews;
     this.inputCoder = inputCoder;
+    this.mainOutput = mainOutput;
+    this.outputMap = outputMap;
   }
 
   public OldDoFn<InputT, OutputT> getDoFn() {
@@ -65,5 +68,12 @@ public class DoFnInfo<InputT, OutputT> implements 
Serializable {
   public Coder<InputT> getInputCoder() {
     return inputCoder;
   }
-}
 
+  public long getMainOutput() {
+    return mainOutput;
+  }
+
+  public Map<Long, TupleTag<?>> getOutputMap() {
+    return outputMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 98d2fb0..762844b 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -46,12 +46,16 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import 
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -465,6 +469,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
                 (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
+    assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
     assertEquals(4, steps.size());
@@ -523,6 +528,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     assertEquals(13, job.getSteps().size());
     Step step = job.getSteps().get(1);
     assertEquals(stepName, getString(step.getProperties(), 
PropertyNames.USER_NAME));
+    assertAllStepOutputsHaveUniqueIds(job);
     return step;
   }
 
@@ -637,7 +643,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
   }
 
   @Test
-  public void testMultiGraphPipelineSerialization() throws IOException {
+  public void testMultiGraphPipelineSerialization() throws Exception {
     Pipeline p = Pipeline.create(buildPipelineOptions());
 
     PCollection<Integer> input = p.begin()
@@ -650,8 +656,9 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
         PipelineOptionsFactory.as(DataflowPipelineOptions.class));
 
     // Check that translation doesn't fail.
-    t.translate(
+    JobSpecification jobSpecification = t.translate(
         p, (DataflowRunner) p.getRunner(), 
Collections.<DataflowPackage>emptyList());
+    assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
   }
 
   @Test
@@ -692,10 +699,11 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
 
     // Check that translation doesn't fail.
-    t.translate(
+    JobSpecification jobSpecification = t.translate(
         pipeline,
         (DataflowRunner) pipeline.getRunner(),
         Collections.<DataflowPackage>emptyList());
+    assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
   }
 
   private void applyRead(Pipeline pipeline, String path) {
@@ -744,6 +752,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
                 (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
+    assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
     assertEquals(2, steps.size());
@@ -753,7 +762,6 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
 
     Step collectionToSingletonStep = steps.get(1);
     assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
-
   }
 
   @Test
@@ -776,6 +784,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
                 (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
+    assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
     assertEquals(2, steps.size());
@@ -806,6 +815,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
                 (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
+    assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
     assertEquals(5, steps.size());
@@ -839,6 +849,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
                 (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
+    assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
     assertEquals(3, steps.size());
@@ -902,6 +913,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
                 (DataflowRunner) pipeline.getRunner(),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
+    assertAllStepOutputsHaveUniqueIds(job);
 
     List<Step> steps = job.getSteps();
     assertEquals(3, steps.size());
@@ -963,4 +975,22 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
     assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
   }
+
+  private static void assertAllStepOutputsHaveUniqueIds(Job job)
+      throws Exception {
+    List<Long> outputIds = new ArrayList<>();
+    for (Step step : job.getSteps()) {
+      List<Map<String, Object>> outputInfoList =
+          (List<Map<String, Object>>) 
step.getProperties().get(PropertyNames.OUTPUT_INFO);
+      if (outputInfoList != null) {
+        for (Map<String, Object> outputInfo : outputInfoList) {
+          outputIds.add(Long.parseLong(Structs.getString(outputInfo, 
PropertyNames.OUTPUT_NAME)));
+        }
+      }
+    }
+    Set<Long> uniqueOutputNames = new HashSet<>(outputIds);
+    outputIds.removeAll(uniqueOutputNames);
+    assertTrue(String.format("Found duplicate output ids %s", outputIds),
+        outputIds.size() == 0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index b0ee231..ddb7cf8 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -895,7 +895,7 @@ public class DataflowRunnerTest {
             // Note: This is about the minimum needed to fake out a
             // translation. This obviously isn't a real translation.
             context.addStep(transform, "TestTranslate");
-            context.addOutput("output", context.getOutput(transform));
+            context.addOutput(context.getOutput(transform));
           }
         });
 
@@ -1098,7 +1098,7 @@ public class DataflowRunnerTest {
 
     DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, 
WindowedValue<Long>>>>,
                IsmRecord<WindowedValue<Long>>> doFnTester =
-        DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, 
Long, IntervalWindow>(
+        DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>(
             outputForSizeTag,
             outputForEntrySetTag,
             windowCoder,
@@ -1198,7 +1198,7 @@ public class DataflowRunnerTest {
 
     DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, 
WindowedValue<Long>>>>,
                IsmRecord<WindowedValue<Long>>> doFnTester =
-        DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, 
Long, IntervalWindow>(
+        DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>(
             outputForSizeTag,
             outputForEntrySetTag,
             windowCoder,

Reply via email to