Repository: incubator-beam Updated Branches: refs/heads/master b8e6eea69 -> d69b324c4
Move the step output ids to use a flat namespace. Also add a logical mapping from tuple tag to the flat namespace for DoFns. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/17782007 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/17782007 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/17782007 Branch: refs/heads/master Commit: 177820074d20e6ac72949f763f52cfb481904fc5 Parents: b8e6eea Author: Luke Cwik <[email protected]> Authored: Thu Oct 13 15:33:49 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon Oct 17 11:47:42 2016 -0700 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 111 ++++++++++++------- .../beam/runners/dataflow/DataflowRunner.java | 12 +- .../dataflow/internal/ReadTranslator.java | 2 +- .../beam/runners/dataflow/util/DoFnInfo.java | 30 +++-- .../DataflowPipelineTranslatorTest.java | 38 ++++++- .../runners/dataflow/DataflowRunnerTest.java | 6 +- 6 files changed, 136 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 0d72881..c0366fc 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -41,6 +41,10 @@ import com.google.api.services.dataflow.model.Environment; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; +import com.google.common.base.Supplier; +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -49,6 +53,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.internal.ReadTranslator; @@ -300,32 +305,30 @@ public class DataflowPipelineTranslator { public void addInput(String name, List<? extends Map<String, Object>> elements); /** - * Adds an output with the given name to the previously added - * Dataflow step, producing the specified output {@code PValue}, + * Adds an output to the previously added Dataflow step, + * producing the specified output {@code PValue}, * including its {@code Coder} if a {@code TypedPValue}. If the * {@code PValue} is a {@code PCollection}, wraps its coder inside - * a {@code WindowedValueCoder}. + * a {@code WindowedValueCoder}. Returns a pipeline level unique id. */ - public void addOutput(String name, PValue value); + public long addOutput(PValue value); /** - * Adds an output with the given name to the previously added - * Dataflow step, producing the specified output {@code PValue}, + * Adds an output to the previously added Dataflow step, + * producing the specified output {@code PValue}, * including its {@code Coder} if a {@code TypedPValue}. If the * {@code PValue} is a {@code PCollection}, wraps its coder inside - * a {@code ValueOnlyCoder}. + * a {@code ValueOnlyCoder}. Returns a pipeline level unique id. */ - public void addValueOnlyOutput(String name, PValue value); + public long addValueOnlyOutput(PValue value); /** - * Adds an output with the given name to the previously added - * CollectionToSingleton Dataflow step, consuming the specified - * input {@code PValue} and producing the specified output + * Adds an output to the previously added CollectionToSingleton Dataflow step, + * consuming the specified input {@code PValue} and producing the specified output * {@code PValue}. This step requires special treatment for its - * output encoding. + * output encoding. Returns a pipeline level unique id. */ - public void addCollectionToSingletonOutput(String name, - PValue inputValue, + public long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue); /** @@ -341,6 +344,19 @@ public class DataflowPipelineTranslator { * Translates a Pipeline into the Dataflow representation. */ class Translator extends PipelineVisitor.Defaults implements TranslationContext { + /** + * An id generator to be used when giving unique ids for pipeline level constructs. + * This is purposely wrapped inside of a {@link Supplier} to prevent the incorrect + * usage of the {@link AtomicLong} that is contained. + */ + private final Supplier<Long> idGenerator = new Supplier<Long>() { + private final AtomicLong generator = new AtomicLong(1L); + @Override + public Long get() { + return generator.getAndIncrement(); + } + }; + /** The Pipeline to translate. */ private final Pipeline pipeline; @@ -634,7 +650,7 @@ public class DataflowPipelineTranslator { } @Override - public void addOutput(String name, PValue value) { + public long addOutput(PValue value) { Coder<?> coder; if (value instanceof TypedPValue) { coder = ((TypedPValue<?>) value).getCoder(); @@ -648,11 +664,11 @@ public class DataflowPipelineTranslator { // No output coder to encode. coder = null; } - addOutput(name, value, coder); + return addOutput(value, coder); } @Override - public void addValueOnlyOutput(String name, PValue value) { + public long addValueOnlyOutput(PValue value) { Coder<?> coder; if (value instanceof TypedPValue) { coder = ((TypedPValue<?>) value).getCoder(); @@ -665,12 +681,11 @@ public class DataflowPipelineTranslator { // No output coder to encode. coder = null; } - addOutput(name, value, coder); + return addOutput(value, coder); } @Override - public void addCollectionToSingletonOutput(String name, - PValue inputValue, + public long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue) { Coder<?> inputValueCoder = checkNotNull(outputCoders.get(inputValue)); @@ -683,7 +698,7 @@ public class DataflowPipelineTranslator { // IterableCoder of the inputValueCoder. This is a property // of the backend "CollectionToSingleton" step. Coder<?> outputValueCoder = IterableCoder.of(inputValueCoder); - addOutput(name, outputValue, outputValueCoder); + return addOutput(outputValue, outputValueCoder); } /** @@ -691,8 +706,9 @@ public class DataflowPipelineTranslator { * Dataflow step, producing the specified output {@code PValue} * with the given {@code Coder} (if not {@code null}). */ - private void addOutput(String name, PValue value, Coder<?> valueCoder) { - registerOutputName(value, name); + private long addOutput(PValue value, Coder<?> valueCoder) { + long id = idGenerator.get(); + registerOutputName(value, Long.toString(id)); Map<String, Object> properties = getProperties(); @Nullable List<Map<String, Object>> outputInfoList = null; @@ -709,7 +725,7 @@ public class DataflowPipelineTranslator { } Map<String, Object> outputInfo = new HashMap<>(); - addString(outputInfo, PropertyNames.OUTPUT_NAME, name); + addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id)); addString(outputInfo, PropertyNames.USER_NAME, value.getName()); if (value instanceof PCollection && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) { @@ -724,6 +740,7 @@ public class DataflowPipelineTranslator { } outputInfoList.add(outputInfo); + return id; } private void addDisplayData(String stepName, HasDisplayData hasDisplayData) { @@ -805,7 +822,6 @@ public class DataflowPipelineTranslator { context.addStep(transform, "CollectionToSingleton"); context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); context.addCollectionToSingletonOutput( - PropertyNames.OUTPUT, context.getInput(transform), context.getOutput(transform)); } @@ -837,7 +853,7 @@ public class DataflowPipelineTranslator { context.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn))); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); + context.addOutput(context.getOutput(transform)); } }); @@ -861,7 +877,7 @@ public class DataflowPipelineTranslator { inputs.add(context.asOutputReference(input)); } context.addInput(PropertyNames.INPUTS, inputs); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); + context.addOutput(context.getOutput(transform)); } }); @@ -880,7 +896,7 @@ public class DataflowPipelineTranslator { TranslationContext context) { context.addStep(transform, "GroupByKey"); context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); + context.addOutput(context.getOutput(transform)); context.addInput(PropertyNames.SORT_VALUES, true); // TODO: Add support for combiner lifting once the need arises. @@ -904,7 +920,7 @@ public class DataflowPipelineTranslator { TranslationContext context) { context.addStep(transform, "GroupByKey"); context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); + context.addOutput(context.getOutput(transform)); WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); @@ -941,9 +957,16 @@ public class DataflowPipelineTranslator { TranslationContext context) { context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); - translateFn(transform.getFn(), context.getInput(transform).getWindowingStrategy(), - transform.getSideInputs(), context.getInput(transform).getCoder(), context); - translateOutputs(context.getOutput(transform), context); + BiMap<Long, TupleTag<?>> outputMap = + translateOutputs(context.getOutput(transform), context); + translateFn( + transform.getFn(), + context.getInput(transform).getWindowingStrategy(), + transform.getSideInputs(), + context.getInput(transform).getCoder(), + context, + outputMap.inverse().get(transform.getMainOutputTag()), + outputMap); } }); @@ -962,11 +985,17 @@ public class DataflowPipelineTranslator { TranslationContext context) { context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); + long mainOutput = context.addOutput(context.getOutput(transform)); translateFn( transform.getFn(), context.getInput(transform).getWindowingStrategy(), - transform.getSideInputs(), context.getInput(transform).getCoder(), context); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); + transform.getSideInputs(), + context.getInput(transform).getCoder(), + context, + mainOutput, + ImmutableMap.<Long, TupleTag<?>>of(mainOutput, + new TupleTag<>(PropertyNames.OUTPUT))); + } }); @@ -983,7 +1012,7 @@ public class DataflowPipelineTranslator { Window.Bound<T> transform, TranslationContext context) { context.addStep(transform, "Bucket"); context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); + context.addOutput(context.getOutput(transform)); WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy(); byte[] serializedBytes = serializeToByteArray(strategy); @@ -1028,22 +1057,26 @@ public class DataflowPipelineTranslator { WindowingStrategy windowingStrategy, Iterable<PCollectionView<?>> sideInputs, Coder inputCoder, - TranslationContext context) { + TranslationContext context, + long mainOutput, + Map<Long, TupleTag<?>> outputMap) { context.addInput(PropertyNames.USER_FN, fn.getClass().getName()); context.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray( - new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder)))); + new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap)))); } - private static void translateOutputs( + private static BiMap<Long, TupleTag<?>> translateOutputs( PCollectionTuple outputs, TranslationContext context) { + ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = ImmutableBiMap.builder(); for (Map.Entry<TupleTag<?>, PCollection<?>> entry : outputs.getAll().entrySet()) { TupleTag<?> tag = entry.getKey(); PCollection<?> output = entry.getValue(); - context.addOutput(tag.getId(), output); + mapBuilder.put(context.addOutput(output), tag); } + return mapBuilder.build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 646a145..55a01f7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1824,7 +1824,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // outputting to all the outputs defined above. PCollectionTuple outputTuple = input .apply("GBKaSVForData", new GroupByKeyHashAndSortByKeyAndWindow<K, V, W>(ismCoder)) - .apply(ParDo.of(new ToIsmRecordForMapLikeDoFn<K, V, W>( + .apply(ParDo.of(new ToIsmRecordForMapLikeDoFn<>( outputForSizeTag, outputForEntrySetTag, windowCoder, inputCoder.getKeyCoder(), ismCoder, uniqueKeysExpected)) .withOutputTags(mainOutputTag, @@ -2116,7 +2116,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { if (overriddenTransform.getIdLabel() != null) { context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); } - context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); + context.addValueOnlyOutput(context.getOutput(transform)); } } @@ -2215,10 +2215,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { source.validate(); if (source.requiresDeduping()) { - return Pipeline.applyTransform(input, new ReadWithIds<T>(source)) + return Pipeline.applyTransform(input, new ReadWithIds<>(source)) .apply(new Deduplicate<T>()); } else { - return Pipeline.applyTransform(input, new ReadWithIds<T>(source)) + return Pipeline.applyTransform(input, new ReadWithIds<>(source)) .apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>())); } } @@ -2348,7 +2348,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { public static <T> StreamingPCollectionViewWriterFn<T> create( PCollectionView<?> view, Coder<T> dataCoder) { - return new StreamingPCollectionViewWriterFn<T>(view, dataCoder); + return new StreamingPCollectionViewWriterFn<>(view, dataCoder); } private StreamingPCollectionViewWriterFn(PCollectionView<?> view, Coder<T> dataCoder) { @@ -2648,7 +2648,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { private static class Concatenate<T> extends CombineFn<T, List<T>, List<T>> { @Override public List<T> createAccumulator() { - return new ArrayList<T>(); + return new ArrayList<>(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java index 094f405..83836c0 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java @@ -62,7 +62,7 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { PropertyNames.SOURCE_STEP_INPUT, cloudSourceToDictionary( CustomSources.serializeToCloudSource(source, context.getPipelineOptions()))); - context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); + context.addValueOnlyOutput(context.getOutput(transform)); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 949c381..b211c04 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -18,10 +18,12 @@ package org.apache.beam.runners.dataflow.util; import java.io.Serializable; +import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; /** * Wrapper class holding the necessary information to serialize a {@link OldDoFn}. @@ -34,20 +36,21 @@ public class DoFnInfo<InputT, OutputT> implements Serializable { private final WindowingStrategy<?, ?> windowingStrategy; private final Iterable<PCollectionView<?>> sideInputViews; private final Coder<InputT> inputCoder; + private final long mainOutput; + private final Map<Long, TupleTag<?>> outputMap; - public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) { - this.doFn = doFn; - this.windowingStrategy = windowingStrategy; - this.sideInputViews = null; - this.inputCoder = null; - } - - public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, - Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) { + public DoFnInfo(OldDoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Iterable<PCollectionView<?>> sideInputViews, + Coder<InputT> inputCoder, + long mainOutput, + Map<Long, TupleTag<?>> outputMap) { this.doFn = doFn; this.windowingStrategy = windowingStrategy; this.sideInputViews = sideInputViews; this.inputCoder = inputCoder; + this.mainOutput = mainOutput; + this.outputMap = outputMap; } public OldDoFn<InputT, OutputT> getDoFn() { @@ -65,5 +68,12 @@ public class DoFnInfo<InputT, OutputT> implements Serializable { public Coder<InputT> getInputCoder() { return inputCoder; } -} + public long getMainOutput() { + return mainOutput; + } + + public Map<Long, TupleTag<?>> getOutputMap() { + return outputMap; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 98d2fb0..762844b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -46,12 +46,16 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; @@ -465,6 +469,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); + assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); assertEquals(4, steps.size()); @@ -523,6 +528,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertEquals(13, job.getSteps().size()); Step step = job.getSteps().get(1); assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME)); + assertAllStepOutputsHaveUniqueIds(job); return step; } @@ -637,7 +643,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { } @Test - public void testMultiGraphPipelineSerialization() throws IOException { + public void testMultiGraphPipelineSerialization() throws Exception { Pipeline p = Pipeline.create(buildPipelineOptions()); PCollection<Integer> input = p.begin() @@ -650,8 +656,9 @@ public class DataflowPipelineTranslatorTest implements Serializable { PipelineOptionsFactory.as(DataflowPipelineOptions.class)); // Check that translation doesn't fail. - t.translate( + JobSpecification jobSpecification = t.translate( p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()); + assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob()); } @Test @@ -692,10 +699,11 @@ public class DataflowPipelineTranslatorTest implements Serializable { applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); // Check that translation doesn't fail. - t.translate( + JobSpecification jobSpecification = t.translate( pipeline, (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()); + assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob()); } private void applyRead(Pipeline pipeline, String path) { @@ -744,6 +752,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); + assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); assertEquals(2, steps.size()); @@ -753,7 +762,6 @@ public class DataflowPipelineTranslatorTest implements Serializable { Step collectionToSingletonStep = steps.get(1); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); - } @Test @@ -776,6 +784,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); + assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); assertEquals(2, steps.size()); @@ -806,6 +815,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); + assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); assertEquals(5, steps.size()); @@ -839,6 +849,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); + assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); assertEquals(3, steps.size()); @@ -902,6 +913,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { (DataflowRunner) pipeline.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); + assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); assertEquals(3, steps.size()); @@ -963,4 +975,22 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData)); assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData)); } + + private static void assertAllStepOutputsHaveUniqueIds(Job job) + throws Exception { + List<Long> outputIds = new ArrayList<>(); + for (Step step : job.getSteps()) { + List<Map<String, Object>> outputInfoList = + (List<Map<String, Object>>) step.getProperties().get(PropertyNames.OUTPUT_INFO); + if (outputInfoList != null) { + for (Map<String, Object> outputInfo : outputInfoList) { + outputIds.add(Long.parseLong(Structs.getString(outputInfo, PropertyNames.OUTPUT_NAME))); + } + } + } + Set<Long> uniqueOutputNames = new HashSet<>(outputIds); + outputIds.removeAll(uniqueOutputNames); + assertTrue(String.format("Found duplicate output ids %s", outputIds), + outputIds.size() == 0); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/17782007/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index b0ee231..ddb7cf8 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -895,7 +895,7 @@ public class DataflowRunnerTest { // Note: This is about the minimum needed to fake out a // translation. This obviously isn't a real translation. context.addStep(transform, "TestTranslate"); - context.addOutput("output", context.getOutput(transform)); + context.addOutput(context.getOutput(transform)); } }); @@ -1098,7 +1098,7 @@ public class DataflowRunnerTest { DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>, IsmRecord<WindowedValue<Long>>> doFnTester = - DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>( + DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>( outputForSizeTag, outputForEntrySetTag, windowCoder, @@ -1198,7 +1198,7 @@ public class DataflowRunnerTest { DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>, IsmRecord<WindowedValue<Long>>> doFnTester = - DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<Long, Long, IntervalWindow>( + DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>( outputForSizeTag, outputForEntrySetTag, windowCoder,
