Repository: beam Updated Branches: refs/heads/master b4d870272 -> a6caa82a6
Add explicit translation builder for a Step to in Dataflow translator Previously, there was always a "current" step that was the most recent step created. This makes it cumbersome or impossible to do things like translate one primitive transform into a small subgraph of steps. Thus we added hacks like CreatePCollectionView which are not actually part of the model at all - in fact, we should be able to add the needed CollectionToSingleton steps simply by looking at the side inputs of a ParDo node. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f04537cc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f04537cc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f04537cc Branch: refs/heads/master Commit: f04537ccbc2897ea4337941d5ca8121432daef43 Parents: b4d8702 Author: Kenneth Knowles <[email protected]> Authored: Wed Dec 21 14:34:27 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Fri Jan 6 11:24:49 2017 -0800 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 313 ++++++++++--------- .../beam/runners/dataflow/DataflowRunner.java | 60 ++-- .../dataflow/internal/ReadTranslator.java | 9 +- .../runners/dataflow/DataflowRunnerTest.java | 5 +- 4 files changed, 196 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 8d2b076..2385fa1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -213,14 +213,12 @@ public class DataflowPipelineTranslator { } /** - * A {@link TransformTranslator} knows how to translate - * a particular subclass of {@link PTransform} for the - * Cloud Dataflow service. It does so by - * mutating the {@link TranslationContext}. + * A {@link TransformTranslator} knows how to translate a particular subclass of {@link + * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link + * TranslationContext}. */ public interface TransformTranslator<TransformT extends PTransform> { - void translate(TransformT transform, - TranslationContext context); + void translate(TransformT transform, TranslationContext context); } /** @@ -252,10 +250,8 @@ public class DataflowPipelineTranslator { /** * Adds a step to the Dataflow workflow for the given transform, with * the given Dataflow step type. - * This step becomes "current" for the purpose of {@link #addInput} and - * {@link #addOutput}. */ - void addStep(PTransform<?, ?> transform, String type); + StepTranslationContext addStep(PTransform<?, ?> transform, String type); /** * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be @@ -264,8 +260,14 @@ public class DataflowPipelineTranslator { * <p>This is a low-level operation, when using this method it is up to * the caller to ensure that names do not collide. */ - void addStep(PTransform<?, ? extends PValue> transform, Step step); + Step addStep(PTransform<?, ? extends PValue> transform, Step step); + /** + * Encode a PValue reference as an output reference. + */ + OutputReference asOutputReference(PValue value); + } + public interface StepTranslationContext { /** * Sets the encoding for the current Dataflow step. */ @@ -330,12 +332,7 @@ public class DataflowPipelineTranslator { * output encoding. Returns a pipeline level unique id. */ long addCollectionToSingletonOutput(PValue inputValue, - PValue outputValue); - - /** - * Encode a PValue reference as an output reference. - */ - OutputReference asOutputReference(PValue value); + PValue outputValue); } @@ -343,6 +340,8 @@ public class DataflowPipelineTranslator { /** * Translates a Pipeline into the Dataflow representation. + * + * <p>For internal use only. */ class Translator extends PipelineVisitor.Defaults implements TranslationContext { /** @@ -368,11 +367,6 @@ public class DataflowPipelineTranslator { private final Job job = new Job(); /** - * Translator is stateful, as addProperty calls refer to the current step. - */ - private Step currentStep; - - /** * A Map from AppliedPTransform to their unique Dataflow step names. */ private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>(); @@ -546,7 +540,7 @@ public class DataflowPipelineTranslator { } @Override - public void addStep(PTransform<?, ?> transform, String type) { + public StepTranslator addStep(PTransform<?, ?> transform, String type) { String stepName = genStepName(); if (stepNames.put(getCurrentTransform(transform), stepName) != null) { throw new IllegalArgumentException( @@ -559,16 +553,19 @@ public class DataflowPipelineTranslator { job.setSteps(steps); } - currentStep = new Step(); - currentStep.setName(stepName); - currentStep.setKind(type); - steps.add(currentStep); - addInput(PropertyNames.USER_NAME, getFullName(transform)); - addDisplayData(stepName, transform); + Step step = new Step(); + step.setName(stepName); + step.setKind(type); + steps.add(step); + + StepTranslator stepContext = new StepTranslator(this, step); + stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform)); + stepContext.addDisplayData(step, stepName, transform); + return stepContext; } @Override - public void addStep(PTransform<?, ? extends PValue> transform, Step original) { + public Step addStep(PTransform<?, ? extends PValue> transform, Step original) { Step step = original.clone(); String stepName = step.getName(); if (stepNames.put(getCurrentTransform(transform), stepName) != null) { @@ -605,8 +602,59 @@ public class DataflowPipelineTranslator { steps = new LinkedList<>(); job.setSteps(steps); } - currentStep = step; steps.add(step); + return step; + } + + @Override + public OutputReference asOutputReference(PValue value) { + AppliedPTransform<?, ?, ?> transform = + value.getProducingTransformInternal(); + String stepName = stepNames.get(transform); + if (stepName == null) { + throw new IllegalArgumentException(transform + " doesn't have a name specified"); + } + + String outputName = outputNames.get(value); + if (outputName == null) { + throw new IllegalArgumentException( + "output " + value + " doesn't have a name specified"); + } + + return new OutputReference(stepName, outputName); + } + + /** + * Returns a fresh Dataflow step name. + */ + private String genStepName() { + return "s" + (stepNames.size() + 1); + } + + /** + * Records the name of the given output PValue, + * within its producing transform. + */ + private void registerOutputName(POutput value, String name) { + if (outputNames.put(value, name) != null) { + throw new IllegalArgumentException( + "output " + value + " already has a name specified"); + } + } + } + + static class StepTranslator implements StepTranslationContext { + + private final Translator translator; + private final Step step; + + private StepTranslator(Translator translator, Step step) { + this.translator = translator; + this.step = step; + } + + private Map<String, Object> getProperties() { + return DataflowPipelineTranslator.getProperties(step); } @Override @@ -643,7 +691,7 @@ public class DataflowPipelineTranslator { @Override public void addInput(String name, PInput value) { if (value instanceof PValue) { - addInput(name, asOutputReference((PValue) value)); + addInput(name, translator.asOutputReference((PValue) value)); } else { throw new IllegalStateException("Input must be a PValue"); } @@ -685,10 +733,10 @@ public class DataflowPipelineTranslator { } @Override - public long addCollectionToSingletonOutput(PValue inputValue, - PValue outputValue) { + public long addCollectionToSingletonOutput( + PValue inputValue, PValue outputValue) { Coder<?> inputValueCoder = - checkNotNull(outputCoders.get(inputValue)); + checkNotNull(translator.outputCoders.get(inputValue)); // The inputValueCoder for the input PCollection should be some // WindowedValueCoder of the input PCollection's element // coder. @@ -707,8 +755,8 @@ public class DataflowPipelineTranslator { * with the given {@code Coder} (if not {@code null}). */ private long addOutput(PValue value, Coder<?> valueCoder) { - long id = idGenerator.get(); - registerOutputName(value, Long.toString(id)); + long id = translator.idGenerator.get(); + translator.registerOutputName(value, Long.toString(id)); Map<String, Object> properties = getProperties(); @Nullable List<Map<String, Object>> outputInfoList = null; @@ -728,7 +776,7 @@ public class DataflowPipelineTranslator { addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id)); addString(outputInfo, PropertyNames.USER_NAME, value.getName()); if (value instanceof PCollection - && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) { + && translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) { addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true); } if (valueCoder != null) { @@ -736,63 +784,19 @@ public class DataflowPipelineTranslator { // failures as early as possible. CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder); addObject(outputInfo, PropertyNames.ENCODING, encoding); - outputCoders.put(value, valueCoder); + translator.outputCoders.put(value, valueCoder); } outputInfoList.add(outputInfo); return id; } - private void addDisplayData(String stepName, HasDisplayData hasDisplayData) { + private void addDisplayData(Step step, String stepName, HasDisplayData hasDisplayData) { DisplayData displayData = DisplayData.from(hasDisplayData); List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class); addList(getProperties(), PropertyNames.DISPLAY_DATA, list); } - @Override - public OutputReference asOutputReference(PValue value) { - AppliedPTransform<?, ?, ?> transform = - value.getProducingTransformInternal(); - String stepName = stepNames.get(transform); - if (stepName == null) { - throw new IllegalArgumentException(transform + " doesn't have a name specified"); - } - - String outputName = outputNames.get(value); - if (outputName == null) { - throw new IllegalArgumentException( - "output " + value + " doesn't have a name specified"); - } - - return new OutputReference(stepName, outputName); - } - - private Map<String, Object> getProperties() { - Map<String, Object> properties = currentStep.getProperties(); - if (properties == null) { - properties = new HashMap<>(); - currentStep.setProperties(properties); - } - return properties; - } - - /** - * Returns a fresh Dataflow step name. - */ - private String genStepName() { - return "s" + (stepNames.size() + 1); - } - - /** - * Records the name of the given output PValue, - * within its producing transform. - */ - private void registerOutputName(POutput value, String name) { - if (outputNames.put(value, name) != null) { - throw new IllegalArgumentException( - "output " + value + " already has a name specified"); - } - } } ///////////////////////////////////////////////////////////////////////////// @@ -802,6 +806,14 @@ public class DataflowPipelineTranslator { return "DataflowPipelineTranslator#" + hashCode(); } + private static Map<String, Object> getProperties(Step step) { + Map<String, Object> properties = step.getProperties(); + if (properties == null) { + properties = new HashMap<>(); + step.setProperties(properties); + } + return properties; + } /////////////////////////////////////////////////////////////////////////// @@ -810,20 +822,17 @@ public class DataflowPipelineTranslator { View.CreatePCollectionView.class, new TransformTranslator<View.CreatePCollectionView>() { @Override - public void translate( - View.CreatePCollectionView transform, - TranslationContext context) { + public void translate(View.CreatePCollectionView transform, TranslationContext context) { translateTyped(transform, context); } private <ElemT, ViewT> void translateTyped( - View.CreatePCollectionView<ElemT, ViewT> transform, - TranslationContext context) { - context.addStep(transform, "CollectionToSingleton"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addCollectionToSingletonOutput( - context.getInput(transform), - context.getOutput(transform)); + View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) { + StepTranslationContext stepContext = + context.addStep(transform, "CollectionToSingleton"); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addCollectionToSingletonOutput( + context.getInput(transform), context.getOutput(transform)); } }); @@ -839,21 +848,21 @@ public class DataflowPipelineTranslator { private <K, InputT, OutputT> void translateHelper( final Combine.GroupedValues<K, InputT, OutputT> transform, - DataflowPipelineTranslator.TranslationContext context) { - context.addStep(transform, "CombineValues"); - translateInputs(context.getInput(transform), transform.getSideInputs(), context); + TranslationContext context) { + StepTranslationContext stepContext = context.addStep(transform, "CombineValues"); + translateInputs( + stepContext, context.getInput(transform), transform.getSideInputs(), context); AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn = transform.getAppliedFn( context.getInput(transform).getPipeline().getCoderRegistry(), - context.getInput(transform).getCoder(), - context.getInput(transform).getWindowingStrategy()); + context.getInput(transform).getCoder(), + context.getInput(transform).getWindowingStrategy()); - context.addEncodingInput(fn.getAccumulatorCoder()); - context.addInput( - PropertyNames.SERIALIZED_FN, - byteArrayToJsonString(serializeToByteArray(fn))); - context.addOutput(context.getOutput(transform)); + stepContext.addEncodingInput(fn.getAccumulatorCoder()); + stepContext.addInput( + PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn))); + stepContext.addOutput(context.getOutput(transform)); } }); @@ -870,14 +879,14 @@ public class DataflowPipelineTranslator { private <T> void flattenHelper( Flatten.FlattenPCollectionList<T> transform, TranslationContext context) { - context.addStep(transform, "Flatten"); + StepTranslationContext stepContext = context.addStep(transform, "Flatten"); List<OutputReference> inputs = new LinkedList<>(); for (PCollection<T> input : context.getInput(transform).getAll()) { inputs.add(context.asOutputReference(input)); } - context.addInput(PropertyNames.INPUTS, inputs); - context.addOutput(context.getOutput(transform)); + stepContext.addInput(PropertyNames.INPUTS, inputs); + stepContext.addOutput(context.getOutput(transform)); } }); @@ -885,23 +894,19 @@ public class DataflowPipelineTranslator { GroupByKeyAndSortValuesOnly.class, new TransformTranslator<GroupByKeyAndSortValuesOnly>() { @Override - public void translate( - GroupByKeyAndSortValuesOnly transform, - TranslationContext context) { + public void translate(GroupByKeyAndSortValuesOnly transform, TranslationContext context) { groupByKeyAndSortValuesHelper(transform, context); } private <K1, K2, V> void groupByKeyAndSortValuesHelper( - GroupByKeyAndSortValuesOnly<K1, K2, V> transform, - TranslationContext context) { - context.addStep(transform, "GroupByKey"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(context.getOutput(transform)); - context.addInput(PropertyNames.SORT_VALUES, true); + GroupByKeyAndSortValuesOnly<K1, K2, V> transform, TranslationContext context) { + StepTranslationContext stepContext = context.addStep(transform, "GroupByKey"); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addOutput(context.getOutput(transform)); + stepContext.addInput(PropertyNames.SORT_VALUES, true); // TODO: Add support for combiner lifting once the need arises. - context.addInput( - PropertyNames.DISALLOW_COMBINER_LIFTING, true); + stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true); } }); @@ -918,9 +923,9 @@ public class DataflowPipelineTranslator { private <K, V> void groupByKeyHelper( GroupByKey<K, V> transform, TranslationContext context) { - context.addStep(transform, "GroupByKey"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(context.getOutput(transform)); + StepTranslationContext stepContext = context.addStep(transform, "GroupByKey"); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addOutput(context.getOutput(transform)); WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); @@ -931,12 +936,12 @@ public class DataflowPipelineTranslator { || (isStreaming && !transform.fewKeys()) // TODO: Allow combiner lifting on the non-default trigger, as appropriate. || !(windowingStrategy.getTrigger() instanceof DefaultTrigger); - context.addInput( + stepContext.addInput( PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting); - context.addInput( + stepContext.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(windowingStrategy))); - context.addInput( + stepContext.addInput( PropertyNames.IS_MERGING_WINDOW_FN, !windowingStrategy.getWindowFn().isNonMerging()); } @@ -946,22 +951,21 @@ public class DataflowPipelineTranslator { ParDo.BoundMulti.class, new TransformTranslator<ParDo.BoundMulti>() { @Override - public void translate( - ParDo.BoundMulti transform, - TranslationContext context) { + public void translate(ParDo.BoundMulti transform, TranslationContext context) { translateMultiHelper(transform, context); } private <InputT, OutputT> void translateMultiHelper( - ParDo.BoundMulti<InputT, OutputT> transform, - TranslationContext context) { - rejectStatefulDoFn(transform.getFn()); + ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { + DataflowPipelineTranslator.rejectStatefulDoFn(transform.getFn()); - context.addStep(transform, "ParallelDo"); - translateInputs(context.getInput(transform), transform.getSideInputs(), context); + StepTranslationContext stepContext = context.addStep(transform, "ParallelDo"); + translateInputs( + stepContext, context.getInput(transform), transform.getSideInputs(), context); BiMap<Long, TupleTag<?>> outputMap = - translateOutputs(context.getOutput(transform), context); + translateOutputs(context.getOutput(transform), stepContext); translateFn( + stepContext, transform.getFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), @@ -976,30 +980,28 @@ public class DataflowPipelineTranslator { ParDo.Bound.class, new TransformTranslator<ParDo.Bound>() { @Override - public void translate( - ParDo.Bound transform, - TranslationContext context) { + public void translate(ParDo.Bound transform, TranslationContext context) { translateSingleHelper(transform, context); } private <InputT, OutputT> void translateSingleHelper( - ParDo.Bound<InputT, OutputT> transform, - TranslationContext context) { + ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { rejectStatefulDoFn(transform.getFn()); - context.addStep(transform, "ParallelDo"); - translateInputs(context.getInput(transform), transform.getSideInputs(), context); - long mainOutput = context.addOutput(context.getOutput(transform)); + StepTranslationContext stepContext = context.addStep(transform, "ParallelDo"); + translateInputs( + stepContext, context.getInput(transform), transform.getSideInputs(), context); + long mainOutput = stepContext.addOutput(context.getOutput(transform)); translateFn( + stepContext, transform.getFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), context.getInput(transform).getCoder(), context, mainOutput, - ImmutableMap.<Long, TupleTag<?>>of(mainOutput, - new TupleTag<>(PropertyNames.OUTPUT))); - + ImmutableMap.<Long, TupleTag<?>>of( + mainOutput, new TupleTag<>(PropertyNames.OUTPUT))); } }); @@ -1014,16 +1016,16 @@ public class DataflowPipelineTranslator { private <T> void translateHelper( Window.Bound<T> transform, TranslationContext context) { - context.addStep(transform, "Bucket"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - context.addOutput(context.getOutput(transform)); + StepTranslationContext stepContext = context.addStep(transform, "Bucket"); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addOutput(context.getOutput(transform)); WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy(); byte[] serializedBytes = serializeToByteArray(strategy); String serializedJson = byteArrayToJsonString(serializedBytes); assert Arrays.equals(serializedBytes, jsonStringToByteArray(serializedJson)); - context.addInput(PropertyNames.SERIALIZED_FN, serializedJson); + stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson); } }); @@ -1046,15 +1048,17 @@ public class DataflowPipelineTranslator { } private static void translateInputs( + StepTranslationContext stepContext, PCollection<?> input, List<PCollectionView<?>> sideInputs, TranslationContext context) { - context.addInput(PropertyNames.PARALLEL_INPUT, input); - translateSideInputs(sideInputs, context); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); + translateSideInputs(stepContext, sideInputs, context); } // Used for ParDo private static void translateSideInputs( + StepTranslationContext stepContext, List<PCollectionView<?>> sideInputs, TranslationContext context) { Map<String, Object> nonParInputs = new HashMap<>(); @@ -1065,10 +1069,11 @@ public class DataflowPipelineTranslator { context.asOutputReference(view)); } - context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs); + stepContext.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs); } private static void translateFn( + StepTranslationContext stepContext, DoFn fn, WindowingStrategy windowingStrategy, Iterable<PCollectionView<?>> sideInputs, @@ -1076,8 +1081,8 @@ public class DataflowPipelineTranslator { TranslationContext context, long mainOutput, Map<Long, TupleTag<?>> outputMap) { - context.addInput(PropertyNames.USER_FN, fn.getClass().getName()); - context.addInput( + stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName()); + stepContext.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString( serializeToByteArray( @@ -1087,13 +1092,13 @@ public class DataflowPipelineTranslator { private static BiMap<Long, TupleTag<?>> translateOutputs( PCollectionTuple outputs, - TranslationContext context) { + StepTranslationContext stepContext) { ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = ImmutableBiMap.builder(); for (Map.Entry<TupleTag<?>, PCollection<?>> entry : outputs.getAll().entrySet()) { TupleTag<?> tag = entry.getKey(); PCollection<?> output = entry.getValue(); - mapBuilder.put(context.addOutput(output), tag); + mapBuilder.put(stepContext.addOutput(output), tag); } return mapBuilder.build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 03e5dfc..d2c1e66 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -33,6 +33,7 @@ import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; +import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -72,6 +73,7 @@ import java.util.SortedSet; import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.runners.dataflow.internal.AssignWindows; @@ -2116,50 +2118,46 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } - /** - * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. - */ - private static class StreamingPubsubIOReadTranslator<T> implements - TransformTranslator<StreamingPubsubIORead<T>> { + /** Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. */ + private static class StreamingPubsubIOReadTranslator<T> + implements TransformTranslator<StreamingPubsubIORead<T>> { @Override - public void translate( - StreamingPubsubIORead<T> transform, - TranslationContext context) { - checkArgument(context.getPipelineOptions().isStreaming(), - "StreamingPubsubIORead is only for streaming pipelines."); + public void translate(StreamingPubsubIORead<T> transform, TranslationContext context) { + checkArgument( + context.getPipelineOptions().isStreaming(), + "StreamingPubsubIORead is only for streaming pipelines."); PubsubUnboundedSource<T> overriddenTransform = transform.getOverriddenTransform(); - context.addStep(transform, "ParallelRead"); - context.addInput(PropertyNames.FORMAT, "pubsub"); + StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); + stepContext.addInput(PropertyNames.FORMAT, "pubsub"); if (overriddenTransform.getTopicProvider() != null) { if (overriddenTransform.getTopicProvider().isAccessible()) { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path()); } else { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_TOPIC_OVERRIDE, ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName()); } } if (overriddenTransform.getSubscriptionProvider() != null) { if (overriddenTransform.getSubscriptionProvider().isAccessible()) { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_SUBSCRIPTION, overriddenTransform.getSubscription().getV1Beta1Path()); } else { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE, - ((NestedValueProvider) overriddenTransform.getSubscriptionProvider()) - .propertyName()); + ((NestedValueProvider) overriddenTransform.getSubscriptionProvider()).propertyName()); } } if (overriddenTransform.getTimestampLabel() != null) { - context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, - overriddenTransform.getTimestampLabel()); + stepContext.addInput( + PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel()); } if (overriddenTransform.getIdLabel() != null) { - context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); + stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); } - context.addValueOnlyOutput(context.getOutput(transform)); + stepContext.addValueOnlyOutput(context.getOutput(transform)); } } @@ -2211,26 +2209,26 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { checkArgument(context.getPipelineOptions().isStreaming(), "StreamingPubsubIOWrite is only for streaming pipelines."); PubsubUnboundedSink<T> overriddenTransform = transform.getOverriddenTransform(); - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.FORMAT, "pubsub"); + StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite"); + stepContext.addInput(PropertyNames.FORMAT, "pubsub"); if (overriddenTransform.getTopicProvider().isAccessible()) { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path()); } else { - context.addInput( + stepContext.addInput( PropertyNames.PUBSUB_TOPIC_OVERRIDE, ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName()); } if (overriddenTransform.getTimestampLabel() != null) { - context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, - overriddenTransform.getTimestampLabel()); + stepContext.addInput( + PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel()); } if (overriddenTransform.getIdLabel() != null) { - context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); + stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); } - context.addEncodingInput( + stepContext.addEncodingInput( WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder())); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java index 84950f7..1a5a9a5 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java @@ -25,6 +25,7 @@ import com.google.api.services.dataflow.model.SourceMetadata; import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator; +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.sdk.io.FileBasedSource; @@ -60,13 +61,13 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { } } - context.addStep(transform, "ParallelRead"); - context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); - context.addInput( + StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); + stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); + stepContext.addInput( PropertyNames.SOURCE_STEP_INPUT, cloudSourceToDictionary( CustomSources.serializeToCloudSource(source, context.getPipelineOptions()))); - context.addValueOnlyOutput(context.getOutput(transform)); + stepContext.addValueOnlyOutput(context.getOutput(transform)); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 21d575a..a19fd8c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -61,6 +61,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap; @@ -998,8 +999,8 @@ public class DataflowRunnerTest { // Note: This is about the minimum needed to fake out a // translation. This obviously isn't a real translation. - context.addStep(transform, "TestTranslate"); - context.addOutput(context.getOutput(transform)); + StepTranslationContext stepContext = context.addStep(transform, "TestTranslate"); + stepContext.addOutput(context.getOutput(transform)); } });
