Removes some OldDoFn code from DoFnRunners DoFnRunners.createDefault() can be replaced with simpleRunner() at the existing call sites, since it is never called with a ReduceFnExecutor at those call sites.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b26ec89 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b26ec89 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b26ec89 Branch: refs/heads/master Commit: 2b26ec8934725a600954ced9c4063766a582396a Parents: 149d52b Author: Eugene Kirpichov <[email protected]> Authored: Thu Jan 12 13:10:40 2017 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Jan 13 14:34:23 2017 -0800 ---------------------------------------------------------------------- .../operators/ApexParDoOperator.java | 2 +- .../apache/beam/runners/core/DoFnRunners.java | 137 +------------------ .../beam/runners/direct/ParDoEvaluator.java | 9 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../beam/runners/dataflow/util/DoFnInfo.java | 62 ++++----- .../runners/spark/translation/DoFnFunction.java | 11 +- .../spark/translation/MultiDoFnFunction.java | 9 +- .../sdk/transforms/reflect/DoFnInvokers.java | 17 +-- 8 files changed, 55 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 1a3387c..de4c15d 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -305,7 +305,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements sideOutputPortMapping.put(sideOutputTags.get(i), port); } - DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault( + DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner( pipelineOptions.get(), doFn, sideInputReader, http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 820bfcd..2f3e93c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -18,9 +18,7 @@ package org.apache.beam.runners.core; import java.util.List; -import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; import org.apache.beam.runners.core.ExecutionContext.StepContext; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; @@ -53,7 +51,7 @@ public class DoFnRunners { * compressed {@link WindowedValue}. It is the responsibility of the runner to perform any key * partitioning needed, etc. */ - static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( + public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( PipelineOptions options, DoFn<InputT, OutputT> fn, SideInputReader sideInputReader, @@ -119,137 +117,4 @@ public class DoFnRunners { stepContext.timerInternals(), droppedDueToLatenessAggregator); } - - /** - * Creates a {@link DoFnRunner} for the provided {@link DoFn}. - */ - public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault( - PipelineOptions options, - DoFn<InputT, OutputT> doFn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy<?, ?> windowingStrategy) { - - // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, - // and window-exploded processing is achieved within the simple runner - return simpleRunner( - options, - doFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - } - - /** - * Creates a {@link DoFnRunner} for the provided {@link OldDoFn}. - * - * <p>In particular, if the {@link OldDoFn} is a {@link ReduceFnExecutor}, a specialized - * implementation detail of streaming {@link GroupAlsoByWindow}, then it will create a special - * runner that operates on {@link KeyedWorkItem KeyedWorkItems}, drops late data and counts - * dropped elements. - * - * @deprecated please port uses of {@link OldDoFn} to use {@link DoFn} - */ - @Deprecated - public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault( - PipelineOptions options, - OldDoFn<InputT, OutputT> doFn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy<?, ?> windowingStrategy) { - - DoFnRunner<InputT, OutputT> doFnRunner = simpleRunner( - options, - doFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - - if (!(doFn instanceof ReduceFnExecutor)) { - return doFnRunner; - } else { - // When a DoFn is a ReduceFnExecutor, we know it has to have an aggregator for dropped - // elements and we also learn that for some K and V, - // InputT = KeyedWorkItem<K, V> - // OutputT = KV<K, V> - - Aggregator<Long, Long> droppedDueToLatenessAggregator = - ((ReduceFnExecutor<?, ?, ?, ?>) doFn).getDroppedDueToLatenessAggregator(); - - @SuppressWarnings({"unchecked", "cast", "rawtypes"}) - DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner( - (DoFnRunner) doFnRunner, - stepContext, - (WindowingStrategy) windowingStrategy, - droppedDueToLatenessAggregator); - - return runner; - } - } - - /** - * Creates the right kind of {@link DoFnRunner} for an object that can be either a {@link DoFn} or - * {@link OldDoFn}. This can be used so that the client need not explicitly reference either such - * class, but merely deserialize a payload and pass it to this method. - * - * @deprecated for migration purposes only for services where users may still submit either {@link - * OldDoFn} or {@link DoFn}. If you know that you have a {@link DoFn} then you should use the - * variant for that instead. - */ - @Deprecated - public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault( - PipelineOptions options, - Object deserializedFn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy<?, ?> windowingStrategy) { - if (deserializedFn instanceof DoFn) { - return createDefault( - options, - (DoFn) deserializedFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - } else if (deserializedFn instanceof OldDoFn) { - return createDefault( - options, - (OldDoFn) deserializedFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - } else { - throw new IllegalArgumentException(String.format("Cannot create %s for %s of class %s", - DoFnRunner.class.getSimpleName(), - deserializedFn, - deserializedFn.getClass())); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index e146470..97d5360 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.direct; import com.google.common.collect.ImmutableList; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -30,6 +29,7 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.TimerInternals.TimerData; @@ -47,7 +47,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { DirectStepContext stepContext, AppliedPTransform<?, ?, ?> application, WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy, - Serializable fn, // may be OldDoFn or DoFn + DoFn<InputT, OutputT> fn, StructuralKey<?> key, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, @@ -72,8 +72,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { ReadyCheckingSideInputReader sideInputReader = evaluationContext.createSideInputReader(sideInputs); + + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner DoFnRunner<InputT, OutputT> underlying = - DoFnRunners.createDefault( + DoFnRunners.simpleRunner( evaluationContext.getPipelineOptions(), fn, sideInputReader, http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 95f2bfd..90cdf4c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -244,7 +244,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> sideInputReader = sideInputHandler; } - DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault( + DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), oldDoFn, sideInputReader, http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index b84def8..0c5be90 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.dataflow.util; -import static com.google.common.base.Preconditions.checkState; - import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.coders.Coder; @@ -29,14 +27,13 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; /** - * Wrapper class holding the necessary information to serialize a {@link OldDoFn} - * or {@link DoFn}. + * Wrapper class holding the necessary information to serialize a {@link DoFn}. * - * @param <InputT> the type of the (main) input elements of the {@link OldDoFn} - * @param <OutputT> the type of the (main) output elements of the {@link OldDoFn} + * @param <InputT> the type of the (main) input elements of the {@link DoFn} + * @param <OutputT> the type of the (main) output elements of the {@link DoFn} */ public class DoFnInfo<InputT, OutputT> implements Serializable { - private final Serializable doFn; + private final DoFn<InputT, OutputT> doFn; private final WindowingStrategy<?, ?> windowingStrategy; private final Iterable<PCollectionView<?>> sideInputViews; private final Coder<InputT> inputCoder; @@ -48,17 +45,37 @@ public class DoFnInfo<InputT, OutputT> implements Serializable { * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob. */ public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn( + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Iterable<PCollectionView<?>> sideInputViews, + Coder<InputT> inputCoder, + long mainOutput, + Map<Long, TupleTag<?>> outputMap) { + return new DoFnInfo<>( + doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); + } + + /** TODO: remove this when Dataflow worker uses the DoFn overload. */ + @Deprecated + @SuppressWarnings("unchecked") + public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn( Serializable doFn, WindowingStrategy<?, ?> windowingStrategy, Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder, long mainOutput, Map<Long, TupleTag<?>> outputMap) { - return new DoFnInfo(doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); + return forFn( + (DoFn<InputT, OutputT>) doFn, + windowingStrategy, + sideInputViews, + inputCoder, + mainOutput, + outputMap); } private DoFnInfo( - Serializable doFn, + DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder, @@ -72,34 +89,15 @@ public class DoFnInfo<InputT, OutputT> implements Serializable { this.outputMap = outputMap; } - /** - * @deprecated use {@link #forFn}. - */ + /** TODO: remove this when Dataflow worker uses {@link #getDoFn}. */ @Deprecated - public DoFnInfo( - OldDoFn doFn, - WindowingStrategy<?, ?> windowingStrategy, - Iterable<PCollectionView<?>> sideInputViews, - Coder<InputT> inputCoder, - long mainOutput, - Map<Long, TupleTag<?>> outputMap) { - this((Serializable) doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); - } - - /** Returns the embedded serialized function. It may be a {@code DoFn} or {@code OldDoFn}. */ public Serializable getFn() { return doFn; } - /** @deprecated use {@link #getFn()} */ - @Deprecated - public OldDoFn getDoFn() { - checkState( - doFn instanceof OldDoFn, - "Deprecated %s.getDoFn() called when the payload was actually a new %s", - DoFnInfo.class.getSimpleName(), - DoFn.class.getSimpleName()); - return (OldDoFn) doFn; + /** Returns the embedded function. */ + public DoFn<InputT, OutputT> getDoFn() { + return doFn; } public WindowingStrategy<?, ?> getWindowingStrategy() { http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index af8e089..bd6cfbe 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -80,18 +80,21 @@ public class DoFnFunction<InputT, OutputT> Iterator<WindowedValue<InputT>> iter) throws Exception { DoFnOutputManager outputManager = new DoFnOutputManager(); + + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner DoFnRunner<InputT, OutputT> doFnRunner = - DoFnRunners.createDefault( + DoFnRunners.simpleRunner( runtimeContext.getPipelineOptions(), doFn, new SparkSideInputReader(sideInputs), outputManager, - new TupleTag<OutputT>() {}, + new TupleTag<OutputT>() { + }, Collections.<TupleTag<?>>emptyList(), new SparkProcessContext.NoOpStepContext(), new SparkAggregators.Factory(runtimeContext, accumulator), - windowingStrategy - ); + windowingStrategy); return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter); } http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 0f9417a..cceffc8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.PairFlatMapFunction; - import scala.Tuple2; @@ -88,8 +87,11 @@ public class MultiDoFnFunction<InputT, OutputT> Iterator<WindowedValue<InputT>> iter) throws Exception { DoFnOutputManager outputManager = new DoFnOutputManager(); + + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner DoFnRunner<InputT, OutputT> doFnRunner = - DoFnRunners.createDefault( + DoFnRunners.simpleRunner( runtimeContext.getPipelineOptions(), doFn, new SparkSideInputReader(sideInputs), @@ -98,8 +100,7 @@ public class MultiDoFnFunction<InputT, OutputT> Collections.<TupleTag<?>>emptyList(), new SparkProcessContext.NoOpStepContext(), new SparkAggregators.Factory(runtimeContext, accumulator), - windowingStrategy - ); + windowingStrategy); return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter); } http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index b141d51..33c5a6a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -36,21 +36,12 @@ public class DoFnInvokers { return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn); } - /** - * Temporarily retained for compatibility with Dataflow worker. - * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}. - * - * @deprecated Use {@link #invokerFor(DoFn)}. - */ - @SuppressWarnings("unchecked") + /** TODO: remove this when Dataflow worker uses the DoFn overload. */ @Deprecated + @SuppressWarnings({"unchecked"}) public static <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor( - Serializable deserializedFn) { - if (deserializedFn instanceof DoFn) { - return invokerFor((DoFn<InputT, OutputT>) deserializedFn); - } - throw new UnsupportedOperationException( - "Only DoFn supported, was: " + deserializedFn.getClass()); + Serializable fn) { + return invokerFor((DoFn) fn); } private DoFnInvokers() {}
