Derive Dataflow output names from steps, not PCollection names Long ago, PCollection names were assigned after transform replacements took place, because this happened interleaved with pipeline construction. Now, runner-independent graphs are constructed with named PCollections and when replacements occur, the names are preserved. This exposed a bug in Dataflow whereby the names of steps and the names of PCollections are tightly coupled.
This change uses the mandatory derived names during translation, shielding users from the bug. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9ed8f9a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9ed8f9a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9ed8f9a Branch: refs/heads/master Commit: c9ed8f9a69d2b3f17e782f4bd0da9bd4305f2320 Parents: 4c0bdd6 Author: Kenneth Knowles <k...@google.com> Authored: Thu Apr 20 15:32:51 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Apr 20 15:32:51 2017 -0700 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 7 +- .../DataflowPipelineTranslatorTest.java | 94 ++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c9ed8f9a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index abeca4d..0c0a2ef 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -656,7 +656,12 @@ public class DataflowPipelineTranslator { Map<String, Object> outputInfo = new HashMap<>(); addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id)); - addString(outputInfo, PropertyNames.USER_NAME, value.getName()); + + String stepName = getString(properties, PropertyNames.USER_NAME); + String generatedName = String.format( + "%s.out%d", stepName, outputInfoList.size()); + + addString(outputInfo, PropertyNames.USER_NAME, generatedName); if (value instanceof PCollection && translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) { addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true); http://git-wip-us.apache.org/repos/asf/beam/blob/c9ed8f9a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 5016d88..9396169 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -772,6 +772,100 @@ public class DataflowPipelineTranslatorTest implements Serializable { } /** + * Test that in translation the name for a collection (in this case just a Create output) is + * overriden to be what the Dataflow service expects. + */ + @Test + public void testNamesOverridden() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowRunner runner = DataflowRunner.fromOptions(options); + options.setStreaming(false); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + + pipeline.apply("Jazzy", Create.of(3)).setName("foobizzle"); + + runner.replaceTransforms(pipeline); + + Job job = translator.translate(pipeline, + runner, + Collections.<DataflowPackage>emptyList()).getJob(); + + // The Create step + Step step = job.getSteps().get(0); + + // This is the name that is "set by the user" that the Dataflow translator must override + String userSpecifiedName = + Structs.getString( + Structs.getListOfMaps( + step.getProperties(), + PropertyNames.OUTPUT_INFO, + null).get(0), + PropertyNames.USER_NAME); + + // This is the calculated name that must actually be used + String calculatedName = getString(step.getProperties(), PropertyNames.USER_NAME) + ".out0"; + + assertThat(userSpecifiedName, equalTo(calculatedName)); + } + + /** + * Test that in translation the name for collections of a multi-output ParDo - a special case + * because the user can name tags - are overridden to be what the Dataflow service expects. + */ + @Test + public void testTaggedNamesOverridden() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowRunner runner = DataflowRunner.fromOptions(options); + options.setStreaming(false); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + + TupleTag<Integer> tag1 = new TupleTag<Integer>("frazzle") {}; + TupleTag<Integer> tag2 = new TupleTag<Integer>("bazzle") {}; + TupleTag<Integer> tag3 = new TupleTag<Integer>() {}; + + PCollectionTuple outputs = + pipeline + .apply(Create.of(3)) + .apply( + ParDo.of( + new DoFn<Integer, Integer>() { + @ProcessElement + public void drop() {} + }) + .withOutputTags(tag1, TupleTagList.of(tag2).and(tag3))); + + outputs.get(tag1).setName("bizbazzle"); + outputs.get(tag2).setName("gonzaggle"); + outputs.get(tag3).setName("froonazzle"); + + runner.replaceTransforms(pipeline); + + Job job = translator.translate(pipeline, + runner, + Collections.<DataflowPackage>emptyList()).getJob(); + + // The ParDo step + Step step = job.getSteps().get(1); + String stepName = Structs.getString(step.getProperties(), PropertyNames.USER_NAME); + + List<Map<String, Object>> outputInfos = + Structs.getListOfMaps(step.getProperties(), PropertyNames.OUTPUT_INFO, null); + + assertThat(outputInfos.size(), equalTo(3)); + + // The names set by the user _and_ the tags _must_ be ignored, or metrics will not show up. + for (int i = 0; i < outputInfos.size(); ++i) { + assertThat( + Structs.getString(outputInfos.get(i), PropertyNames.USER_NAME), + equalTo(String.format("%s.out%s", stepName, i))); + } + } + + /** * Smoke test to fail fast if translation of a stateful ParDo * in batch breaks. */