Move some pieces of Dataflow translator to top level
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d2cb3e2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d2cb3e2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d2cb3e2 Branch: refs/heads/master Commit: 5d2cb3e2310dbf7046785e9e8f6403b854b2dd03 Parents: f04537c Author: Kenneth Knowles <[email protected]> Authored: Thu Jan 5 16:51:23 2017 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Fri Jan 6 11:36:51 2017 -0800 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 134 +------------------ .../beam/runners/dataflow/DataflowRunner.java | 8 +- .../runners/dataflow/TransformTranslator.java | 123 +++++++++++++++++ .../dataflow/internal/ReadTranslator.java | 7 +- .../DataflowPipelineTranslatorTest.java | 3 +- .../runners/dataflow/DataflowRunnerTest.java | 5 +- 6 files changed, 137 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 2385fa1..e9cf6f4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -56,6 +56,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly; +import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; +import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DoFnInfo; @@ -69,6 +71,7 @@ import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.GroupedValues; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; @@ -80,6 +83,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.Bound; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.PropertyNames; @@ -212,130 +216,6 @@ public class DataflowPipelineTranslator { return transformTranslators.get(transformClass); } - /** - * A {@link TransformTranslator} knows how to translate a particular subclass of {@link - * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link - * TranslationContext}. - */ - public interface TransformTranslator<TransformT extends PTransform> { - void translate(TransformT transform, TranslationContext context); - } - - /** - * The interface provided to registered callbacks for interacting - * with the {@link DataflowRunner}, including reading and writing the - * values of {@link PCollection}s and side inputs ({@link PCollectionView}s). - */ - public interface TranslationContext { - /** - * Returns the configured pipeline options. - */ - DataflowPipelineOptions getPipelineOptions(); - - /** - * Returns the input of the currently being translated transform. - */ - <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform); - - /** - * Returns the output of the currently being translated transform. - */ - <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform); - - /** - * Returns the full name of the currently being translated transform. - */ - String getFullName(PTransform<?, ?> transform); - - /** - * Adds a step to the Dataflow workflow for the given transform, with - * the given Dataflow step type. - */ - StepTranslationContext addStep(PTransform<?, ?> transform, String type); - - /** - * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be - * consistent with the Step, in terms of input, output and coder types. - * - * <p>This is a low-level operation, when using this method it is up to - * the caller to ensure that names do not collide. - */ - Step addStep(PTransform<?, ? extends PValue> transform, Step step); - /** - * Encode a PValue reference as an output reference. - */ - OutputReference asOutputReference(PValue value); - } - - public interface StepTranslationContext { - /** - * Sets the encoding for the current Dataflow step. - */ - void addEncodingInput(Coder<?> value); - - /** - * Adds an input with the given name and value to the current - * Dataflow step. - */ - void addInput(String name, Boolean value); - - /** - * Adds an input with the given name and value to the current - * Dataflow step. - */ - void addInput(String name, String value); - - /** - * Adds an input with the given name and value to the current - * Dataflow step. - */ - void addInput(String name, Long value); - - /** - * Adds an input with the given name to the previously added Dataflow - * step, coming from the specified input PValue. - */ - void addInput(String name, PInput value); - - /** - * Adds an input that is a dictionary of strings to objects. - */ - void addInput(String name, Map<String, Object> elements); - - /** - * Adds an input that is a list of objects. - */ - void addInput(String name, List<? extends Map<String, Object>> elements); - - /** - * Adds an output to the previously added Dataflow step, - * producing the specified output {@code PValue}, - * including its {@code Coder} if a {@code TypedPValue}. If the - * {@code PValue} is a {@code PCollection}, wraps its coder inside - * a {@code WindowedValueCoder}. Returns a pipeline level unique id. - */ - long addOutput(PValue value); - - /** - * Adds an output to the previously added Dataflow step, - * producing the specified output {@code PValue}, - * including its {@code Coder} if a {@code TypedPValue}. If the - * {@code PValue} is a {@code PCollection}, wraps its coder inside - * a {@code ValueOnlyCoder}. Returns a pipeline level unique id. - */ - long addValueOnlyOutput(PValue value); - - /** - * Adds an output to the previously added CollectionToSingleton Dataflow step, - * consuming the specified input {@code PValue} and producing the specified output - * {@code PValue}. This step requires special treatment for its - * output encoding. Returns a pipeline level unique id. - */ - long addCollectionToSingletonOutput(PValue inputValue, - PValue outputValue); - } - - ///////////////////////////////////////////////////////////////////////////// /** @@ -838,11 +718,11 @@ public class DataflowPipelineTranslator { DataflowPipelineTranslator.registerTransformTranslator( Combine.GroupedValues.class, - new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() { + new TransformTranslator<GroupedValues>() { @Override public void translate( Combine.GroupedValues transform, - DataflowPipelineTranslator.TranslationContext context) { + TranslationContext context) { translateHelper(transform, context); } @@ -1007,7 +887,7 @@ public class DataflowPipelineTranslator { registerTransformTranslator( Window.Bound.class, - new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() { + new TransformTranslator<Bound>() { @Override public void translate( Window.Bound transform, TranslationContext context) { http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d2c1e66..9da7d24 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -33,7 +33,6 @@ import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; -import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -73,9 +72,6 @@ import java.util.SortedSet; import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.runners.dataflow.internal.AssignWindows; import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource; @@ -2315,10 +2311,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } private static class ReadWithIdsTranslator - implements DataflowPipelineTranslator.TransformTranslator<ReadWithIds<?>> { + implements TransformTranslator<ReadWithIds<?>> { @Override public void translate(ReadWithIds<?> transform, - DataflowPipelineTranslator.TranslationContext context) { + TranslationContext context) { ReadTranslator.translateReadHelper(transform.getSource(), transform, context); } } http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java new file mode 100644 index 0000000..2aa8327 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import com.google.api.services.dataflow.model.Step; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.OutputReference; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; + +/** + * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform} + * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}. + */ +public interface TransformTranslator<TransformT extends PTransform> { + void translate(TransformT transform, TranslationContext context); + + /** + * The interface provided to registered callbacks for interacting with the {@link DataflowRunner}, + * including reading and writing the values of {@link PCollection}s and side inputs. + */ + interface TranslationContext { + /** Returns the configured pipeline options. */ + DataflowPipelineOptions getPipelineOptions(); + + /** Returns the input of the currently being translated transform. */ + <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform); + + /** Returns the output of the currently being translated transform. */ + <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform); + + /** Returns the full name of the currently being translated transform. */ + String getFullName(PTransform<?, ?> transform); + + /** + * Adds a step to the Dataflow workflow for the given transform, with the given Dataflow step + * type. + */ + StepTranslationContext addStep(PTransform<?, ?> transform, String type); + + /** + * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be consistent + * with the Step, in terms of input, output and coder types. + * + * <p>This is a low-level operation, when using this method it is up to the caller to ensure + * that names do not collide. + */ + Step addStep(PTransform<?, ? extends PValue> transform, Step step); + /** Encode a PValue reference as an output reference. */ + OutputReference asOutputReference(PValue value); + } + + /** The interface for a {@link TransformTranslator} to build a Dataflow step. */ + interface StepTranslationContext { + /** Sets the encoding for this Dataflow step. */ + void addEncodingInput(Coder<?> value); + + /** Adds an input with the given name and value to this Dataflow step. */ + void addInput(String name, Boolean value); + + /** Adds an input with the given name and value to this Dataflow step. */ + void addInput(String name, String value); + + /** Adds an input with the given name and value to this Dataflow step. */ + void addInput(String name, Long value); + + /** + * Adds an input with the given name to this Dataflow step, coming from the specified input + * PValue. + */ + void addInput(String name, PInput value); + + /** Adds an input that is a dictionary of strings to objects. */ + void addInput(String name, Map<String, Object> elements); + + /** Adds an input that is a list of objects. */ + void addInput(String name, List<? extends Map<String, Object>> elements); + + /** + * Adds an output to this Dataflow step, producing the specified output {@code PValue}, + * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code + * PCollection}, wraps its coder inside a {@code WindowedValueCoder}. Returns a pipeline level + * unique id. + */ + long addOutput(PValue value); + + /** + * Adds an output to this Dataflow step, producing the specified output {@code PValue}, + * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code + * PCollection}, wraps its coder inside a {@code ValueOnlyCoder}. Returns a pipeline level + * unique id. + */ + long addValueOnlyOutput(PValue value); + + /** + * Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified + * input {@code PValue} and producing the specified output {@code PValue}. This step requires + * special treatment for its output encoding. Returns a pipeline level unique id. + */ + long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java index 1a5a9a5..a15a2a3 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java @@ -24,10 +24,7 @@ import static org.apache.beam.sdk.util.Structs.addLong; import com.google.api.services.dataflow.model.SourceMetadata; import java.util.HashMap; import java.util.Map; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; +import org.apache.beam.runners.dataflow.TransformTranslator; import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; @@ -47,7 +44,7 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { public static <T> void translateReadHelper(Source<T> source, PTransform<?, ? extends PValue> transform, - DataflowPipelineTranslator.TranslationContext context) { + TranslationContext context) { try { // TODO: Move this validation out of translation once IOChannelUtils is portable // and can be reconstructed on the worker. http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index ab82941..84b585a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -56,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.runners.dataflow.util.OutputReference; @@ -566,7 +565,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { * {@link TranslationContext#addStep} and remaps the input port reference. */ private static class EmbeddedTranslator - implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> { + implements TransformTranslator<EmbeddedTransform> { @Override public void translate(EmbeddedTransform transform, TranslationContext context) { addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT, context.asOutputReference(context.getInput(transform))); http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index a19fd8c..4fff1c6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -61,7 +61,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.regex.Pattern; -import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap; import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap; @@ -989,12 +988,12 @@ public class DataflowRunnerTest { DataflowPipelineTranslator.registerTransformTranslator( TestTransform.class, - new DataflowPipelineTranslator.TransformTranslator<TestTransform>() { + new TransformTranslator<TestTransform>() { @SuppressWarnings("unchecked") @Override public void translate( TestTransform transform, - DataflowPipelineTranslator.TranslationContext context) { + TranslationContext context) { transform.translated = true; // Note: This is about the minimum needed to fake out a
