Derive Dataflow output names from steps, not PCollection names

Long ago, PCollection names were assigned after transform replacements took
place, because this happened interleaved with pipeline construction. Now,
runner-independent graphs are constructed with named PCollections and when
replacements occur, the names are preserved. This exposed a bug in Dataflow
whereby the names of steps and the names of PCollections are tightly coupled.

This change uses the mandatory derived names during translation, shielding
users from the bug.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9ed8f9a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9ed8f9a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9ed8f9a

Branch: refs/heads/master
Commit: c9ed8f9a69d2b3f17e782f4bd0da9bd4305f2320
Parents: 4c0bdd6
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Apr 20 15:32:51 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu Apr 20 15:32:51 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    |  7 +-
 .../DataflowPipelineTranslatorTest.java         | 94 ++++++++++++++++++++
 2 files changed, 100 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c9ed8f9a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index abeca4d..0c0a2ef 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -656,7 +656,12 @@ public class DataflowPipelineTranslator {
 
       Map<String, Object> outputInfo = new HashMap<>();
       addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id));
-      addString(outputInfo, PropertyNames.USER_NAME, value.getName());
+
+      String stepName = getString(properties, PropertyNames.USER_NAME);
+      String generatedName = String.format(
+          "%s.out%d", stepName, outputInfoList.size());
+
+      addString(outputInfo, PropertyNames.USER_NAME, generatedName);
       if (value instanceof PCollection
           && 
translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
         addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);

http://git-wip-us.apache.org/repos/asf/beam/blob/c9ed8f9a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 5016d88..9396169 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -772,6 +772,100 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
   }
 
   /**
+   * Test that in translation the name for a collection (in this case just a 
Create output) is
+   * overriden to be what the Dataflow service expects.
+   */
+  @Test
+  public void testNamesOverridden() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    options.setStreaming(false);
+    DataflowPipelineTranslator translator = 
DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    pipeline.apply("Jazzy", Create.of(3)).setName("foobizzle");
+
+    runner.replaceTransforms(pipeline);
+
+    Job job = translator.translate(pipeline,
+        runner,
+        Collections.<DataflowPackage>emptyList()).getJob();
+
+    // The Create step
+    Step step = job.getSteps().get(0);
+
+    // This is the name that is "set by the user" that the Dataflow translator 
must override
+    String userSpecifiedName =
+        Structs.getString(
+            Structs.getListOfMaps(
+                step.getProperties(),
+                PropertyNames.OUTPUT_INFO,
+                null).get(0),
+        PropertyNames.USER_NAME);
+
+    // This is the calculated name that must actually be used
+    String calculatedName = getString(step.getProperties(), 
PropertyNames.USER_NAME) + ".out0";
+
+    assertThat(userSpecifiedName, equalTo(calculatedName));
+  }
+
+  /**
+   * Test that in translation the name for collections of a multi-output ParDo 
- a special case
+   * because the user can name tags - are overridden to be what the Dataflow 
service expects.
+   */
+  @Test
+  public void testTaggedNamesOverridden() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    options.setStreaming(false);
+    DataflowPipelineTranslator translator = 
DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    TupleTag<Integer> tag1 = new TupleTag<Integer>("frazzle") {};
+    TupleTag<Integer> tag2 = new TupleTag<Integer>("bazzle") {};
+    TupleTag<Integer> tag3 = new TupleTag<Integer>() {};
+
+    PCollectionTuple outputs =
+        pipeline
+            .apply(Create.of(3))
+            .apply(
+                ParDo.of(
+                        new DoFn<Integer, Integer>() {
+                          @ProcessElement
+                          public void drop() {}
+                        })
+                    .withOutputTags(tag1, TupleTagList.of(tag2).and(tag3)));
+
+    outputs.get(tag1).setName("bizbazzle");
+    outputs.get(tag2).setName("gonzaggle");
+    outputs.get(tag3).setName("froonazzle");
+
+    runner.replaceTransforms(pipeline);
+
+    Job job = translator.translate(pipeline,
+        runner,
+        Collections.<DataflowPackage>emptyList()).getJob();
+
+    // The ParDo step
+    Step step = job.getSteps().get(1);
+    String stepName = Structs.getString(step.getProperties(), 
PropertyNames.USER_NAME);
+
+    List<Map<String, Object>> outputInfos =
+        Structs.getListOfMaps(step.getProperties(), PropertyNames.OUTPUT_INFO, 
null);
+
+    assertThat(outputInfos.size(), equalTo(3));
+
+    // The names set by the user _and_ the tags _must_ be ignored, or metrics 
will not show up.
+    for (int i = 0; i < outputInfos.size(); ++i) {
+      assertThat(
+          Structs.getString(outputInfos.get(i), PropertyNames.USER_NAME),
+          equalTo(String.format("%s.out%s", stepName, i)));
+    }
+  }
+
+  /**
    * Smoke test to fail fast if translation of a stateful ParDo
    * in batch breaks.
    */

Reply via email to