Remove PValue.getProducingTransformInternal This exposes too much information on the structure of the Graph to be appropriate for non-graph related APIs. It also is not stable, and subject to change at any point due to Pipeline Surgery APIs.
Update DataflowRunner to use the visited TransformHierarchy to obtain the producing transforms for PValues, and store these values to construct the graph. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b6863b1d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b6863b1d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b6863b1d Branch: refs/heads/master Commit: b6863b1ddba1fd5ccf5ac61da5aca74116ffa77a Parents: 154c543 Author: Thomas Groh <[email protected]> Authored: Tue Feb 21 13:55:23 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Feb 21 15:21:41 2017 -0800 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 97 ++++++++++---------- .../beam/runners/dataflow/DataflowRunner.java | 3 +- .../runners/dataflow/TransformTranslator.java | 13 ++- .../DataflowPipelineTranslatorTest.java | 7 +- .../java/org/apache/beam/sdk/values/PValue.java | 9 -- 5 files changed, 67 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 55f3e5e..6eec603 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -92,6 +92,7 @@ import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; @@ -255,6 +256,11 @@ public class DataflowPipelineTranslator { private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>(); /** + * A Map from {@link PValue} to the {@link AppliedPTransform} that produces that {@link PValue}. + */ + private final Map<PValue, AppliedPTransform<?, ?, ?>> producers = new HashMap<>(); + + /** * A Map from PValues to their output names used by their producer * Dataflow steps. */ @@ -421,15 +427,11 @@ public class DataflowPipelineTranslator { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { + producers.put(value, producer.toAppliedPTransform()); LOG.debug("Checking translation of {}", value); - if (value.getProducingTransformInternal() == null) { - throw new RuntimeException( - "internal error: expecting a PValue " - + "to have a producingTransform"); - } if (!producer.isCompositeNode()) { // Primitive transforms are the only ones assigned step names. - asOutputReference(value); + asOutputReference(value, producer.toAppliedPTransform()); } } @@ -500,24 +502,25 @@ public class DataflowPipelineTranslator { return step; } - @Override - public OutputReference asOutputReference(PValue value) { - AppliedPTransform<?, ?, ?> transform = - value.getProducingTransformInternal(); - String stepName = stepNames.get(transform); - if (stepName == null) { - throw new IllegalArgumentException(transform + " doesn't have a name specified"); - } + public OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer) { + String stepName = stepNames.get(producer); + checkArgument(stepName != null, "%s doesn't have a name specified", producer); String outputName = outputNames.get(value); - if (outputName == null) { - throw new IllegalArgumentException( - "output " + value + " doesn't have a name specified"); - } + checkArgument(outputName != null, "output %s doesn't have a name specified", value); return new OutputReference(stepName, outputName); } + @Override + public AppliedPTransform<?, ?, ?> getProducer(PValue value) { + return checkNotNull( + producers.get(value), + "Unknown producer for value %s while translating step %s", + value, + currentTransform.getFullName()); + } + /** * Returns a fresh Dataflow step name. */ @@ -585,7 +588,8 @@ public class DataflowPipelineTranslator { @Override public void addInput(String name, PInput value) { if (value instanceof PValue) { - addInput(name, translator.asOutputReference((PValue) value)); + PValue pvalue = (PValue) value; + addInput(name, translator.asOutputReference(pvalue, translator.getProducer(pvalue))); } else { throw new IllegalStateException("Input must be a PValue"); } @@ -707,9 +711,9 @@ public class DataflowPipelineTranslator { View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "CollectionToSingleton"); - stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - stepContext.addCollectionToSingletonOutput( - context.getInput(transform), context.getOutput(transform)); + PCollection<ElemT> input = context.getInput(transform); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); + stepContext.addCollectionToSingletonOutput(input, context.getOutput(transform)); } }); @@ -748,19 +752,19 @@ public class DataflowPipelineTranslator { new TransformTranslator<Flatten.FlattenPCollectionList>() { @Override public void translate( - Flatten.FlattenPCollectionList transform, - TranslationContext context) { + Flatten.FlattenPCollectionList transform, TranslationContext context) { flattenHelper(transform, context); } private <T> void flattenHelper( - Flatten.FlattenPCollectionList<T> transform, - TranslationContext context) { + Flatten.FlattenPCollectionList<T> transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "Flatten"); List<OutputReference> inputs = new LinkedList<>(); for (TaggedPValue input : context.getInputs(transform)) { - inputs.add(context.asOutputReference(input.getValue())); + inputs.add( + context.asOutputReference( + input.getValue(), context.getProducer(input.getValue()))); } stepContext.addInput(PropertyNames.INPUTS, inputs); stepContext.addOutput(context.getOutput(transform)); @@ -778,7 +782,8 @@ public class DataflowPipelineTranslator { private <K1, K2, V> void groupByKeyAndSortValuesHelper( GroupByKeyAndSortValuesOnly<K1, K2, V> transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "GroupByKey"); - stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + PCollection<KV<K1, KV<K2, V>>> input = context.getInput(transform); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); stepContext.addOutput(context.getOutput(transform)); stepContext.addInput(PropertyNames.SORT_VALUES, true); @@ -791,30 +796,26 @@ public class DataflowPipelineTranslator { GroupByKey.class, new TransformTranslator<GroupByKey>() { @Override - public void translate( - GroupByKey transform, - TranslationContext context) { + public void translate(GroupByKey transform, TranslationContext context) { groupByKeyHelper(transform, context); } private <K, V> void groupByKeyHelper( - GroupByKey<K, V> transform, - TranslationContext context) { + GroupByKey<K, V> transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "GroupByKey"); - stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + PCollection<KV<K, V>> input = context.getInput(transform); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); stepContext.addOutput(context.getOutput(transform)); - WindowingStrategy<?, ?> windowingStrategy = - context.getInput(transform).getWindowingStrategy(); + WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); boolean isStreaming = context.getPipelineOptions().as(StreamingOptions.class).isStreaming(); boolean disallowCombinerLifting = !windowingStrategy.getWindowFn().isNonMerging() - || (isStreaming && !transform.fewKeys()) - // TODO: Allow combiner lifting on the non-default trigger, as appropriate. - || !(windowingStrategy.getTrigger() instanceof DefaultTrigger); - stepContext.addInput( - PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting); + || (isStreaming && !transform.fewKeys()) + // TODO: Allow combiner lifting on the non-default trigger, as appropriate. + || !(windowingStrategy.getTrigger() instanceof DefaultTrigger); + stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting); stepContext.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(windowingStrategy))); @@ -888,18 +889,16 @@ public class DataflowPipelineTranslator { translateHelper(transform, context); } - private <T> void translateHelper( - Window.Bound<T> transform, TranslationContext context) { + private <T> void translateHelper(Window.Bound<T> transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "Bucket"); - stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + PCollection<T> input = context.getInput(transform); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); stepContext.addOutput(context.getOutput(transform)); - WindowingStrategy<?, ?> strategy = - context.getOutput(transform).getWindowingStrategy(); + WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy(); byte[] serializedBytes = serializeToByteArray(strategy); String serializedJson = byteArrayToJsonString(serializedBytes); - assert Arrays.equals(serializedBytes, - jsonStringToByteArray(serializedJson)); + assert Arrays.equals(serializedBytes, jsonStringToByteArray(serializedJson)); stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson); } }); @@ -929,7 +928,7 @@ public class DataflowPipelineTranslator { for (PCollectionView<?> view : sideInputs) { nonParInputs.put( view.getTagInternal().getId(), - context.asOutputReference(view)); + context.asOutputReference(view, context.getProducer(view))); } stepContext.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs); http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index fcba9be..e5ed933 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -874,7 +874,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder( overriddenTransform.getElementCoder())); } - stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + PCollection<T> input = context.getInput(transform); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java index 6a82672..e020e83 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.OutputReference; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; @@ -73,7 +74,12 @@ interface TransformTranslator<TransformT extends PTransform> { */ Step addStep(PTransform<?, ? extends PValue> transform, Step step); /** Encode a PValue reference as an output reference. */ - OutputReference asOutputReference(PValue value); + OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer); + + /** + * Get the {@link AppliedPTransform} that produced the provided {@link PValue}. + */ + AppliedPTransform<?, ?, ?> getProducer(PValue value); } /** The interface for a {@link TransformTranslator} to build a Dataflow step. */ @@ -93,6 +99,11 @@ interface TransformTranslator<TransformT extends PTransform> { /** * Adds an input with the given name to this Dataflow step, coming from the specified input * PValue. + * + * <p>The input {@link PValue} must have already been produced by a step earlier in this {@link + * Pipeline}. If the input value has not yet been produced yet (either by a call to {@link + * StepTranslationContext#addOutput(PValue)} or within a call to {@link + * TranslationContext#addStep(PTransform, Step)}), this method will throw an exception. */ void addInput(String name, PInput value); http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 2ff1032..5d13c3e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -565,8 +565,11 @@ public class DataflowPipelineTranslatorTest implements Serializable { private static class EmbeddedTranslator implements TransformTranslator<EmbeddedTransform> { @Override public void translate(EmbeddedTransform transform, TranslationContext context) { - addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT, - context.asOutputReference(context.getInput(transform))); + PCollection<String> input = context.getInput(transform); + addObject( + transform.step.getProperties(), + PropertyNames.PARALLEL_INPUT, + context.asOutputReference(input, context.getProducer(input))); context.addStep(transform, transform.step); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b6863b1d/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java index 052a1f3..4c62972 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.values; import java.util.List; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; /** @@ -32,14 +31,6 @@ public interface PValue extends POutput, PInput { String getName(); /** - * Returns the {@link AppliedPTransform} that this {@link PValue} is an output of. - * - * <p>For internal use only. - */ - @Deprecated - AppliedPTransform<?, ?, ?> getProducingTransformInternal(); - - /** * {@inheritDoc}. * * <p>A {@link PValue} always expands into itself. Calling {@link #expand()} on a PValue is almost
