Pushes uses of OldDoFn deeper inside Flink runner In particular, various DoFnOperator's now take a regular DoFn rather than an OldDoFn, and convert it to an OldDoFn internally.
This allows to remove uses of ParDo.getFn() returning OldDoFn. The only case where the OldDoFn inside a DoFnOperator is actually an OldDoFn rather than DoFn in disguise is now WindowDoFnOperator, which overrides getDoFn to return an actual GABW OldDoFn. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8330bfa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8330bfa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8330bfa7 Branch: refs/heads/master Commit: 8330bfa74cd72e51a29649745e87a4f1a6e5ffa1 Parents: af616d9 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Dec 9 16:47:01 2016 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Dec 15 13:55:24 2016 -0800 ---------------------------------------------------------------------- .../FlinkBatchTransformTranslators.java | 9 +--- .../FlinkStreamingTransformTranslators.java | 8 ++-- .../functions/FlinkDoFnFunction.java | 10 +++-- .../functions/FlinkMultiOutputDoFnFunction.java | 10 +++-- .../wrappers/streaming/DoFnOperator.java | 43 ++++++++++++++++---- .../wrappers/streaming/WindowDoFnOperator.java | 8 ++-- .../beam/runners/flink/PipelineOptionsTest.java | 5 +-- .../flink/streaming/DoFnOperatorTest.java | 8 ++-- 8 files changed, 63 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 9ac907f..497b293 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -50,7 +50,6 @@ import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -523,8 +522,6 @@ class FlinkBatchTransformTranslators { DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn(); - TypeInformation<WindowedValue<OutputT>> typeInformation = context.getTypeInfo(context.getOutput(transform)); @@ -539,7 +536,7 @@ class FlinkBatchTransformTranslators { FlinkDoFnFunction<InputT, OutputT> doFnWrapper = new FlinkDoFnFunction<>( - oldDoFn, + doFn, context.getOutput(transform).getWindowingStrategy(), sideInputStrategies, context.getPipelineOptions()); @@ -570,8 +567,6 @@ class FlinkBatchTransformTranslators { DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn(); - Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll(); Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap(); @@ -618,7 +613,7 @@ class FlinkBatchTransformTranslators { @SuppressWarnings("unchecked") FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper = new FlinkMultiOutputDoFnFunction( - oldDoFn, + doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 042f8df..42ef630 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -358,7 +358,7 @@ public class FlinkStreamingTransformTranslators { if (sideInputs.isEmpty()) { DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>( - transform.getFn(), + transform.getNewFn(), inputTypeInfo, new TupleTag<OutputT>("main output"), Collections.<TupleTag<?>>emptyList(), @@ -381,7 +381,7 @@ public class FlinkStreamingTransformTranslators { DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>( - transform.getFn(), + transform.getNewFn(), inputTypeInfo, new TupleTag<OutputT>("main output"), Collections.<TupleTag<?>>emptyList(), @@ -515,7 +515,7 @@ public class FlinkStreamingTransformTranslators { if (sideInputs.isEmpty()) { DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = new DoFnOperator<>( - transform.getFn(), + transform.getNewFn(), inputTypeInfo, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), @@ -542,7 +542,7 @@ public class FlinkStreamingTransformTranslators { DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = new DoFnOperator<>( - transform.getFn(), + transform.getNewFn(), inputTypeInfo, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index db045f5..ed200d5 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -20,7 +20,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; @@ -46,16 +49,17 @@ public class FlinkDoFnFunction<InputT, OutputT> private final WindowingStrategy<?, ?> windowingStrategy; public FlinkDoFnFunction( - OldDoFn<InputT, OutputT> doFn, + DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions options) { - this.doFn = doFn; + this.doFn = DoFnAdapters.toOldDoFn(doFn); this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(options); this.windowingStrategy = windowingStrategy; - this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; + this.requiresWindowAccess = + DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow(); this.hasSideInputs = !sideInputs.isEmpty(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index 7be4bb4..7f6a436 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -20,8 +20,11 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; @@ -54,16 +57,17 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT> private final WindowingStrategy<?, ?> windowingStrategy; public FlinkMultiOutputDoFnFunction( - OldDoFn<InputT, OutputT> doFn, + DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions options, Map<TupleTag<?>, Integer> outputMap) { - this.doFn = doFn; + this.doFn = DoFnAdapters.toOldDoFn(doFn); this.serializedOptions = new SerializedPipelineOptions(options); this.outputMap = outputMap; - this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; + this.requiresWindowAccess = + DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow(); this.hasSideInputs = !sideInputs.isEmpty(); this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index a29664b..6729aaa 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -88,7 +89,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>, TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> { - protected OldDoFn<InputT, FnOutputT> doFn; + protected OldDoFn<InputT, FnOutputT> oldDoFn; + protected final SerializedPipelineOptions serializedOptions; protected final TupleTag<FnOutputT> mainOutputTag; @@ -117,8 +119,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState; + @Deprecated public DoFnOperator( - OldDoFn<InputT, FnOutputT> doFn, + OldDoFn<InputT, FnOutputT> oldDoFn, TypeInformation<WindowedValue<InputT>> inputType, TupleTag<FnOutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, @@ -127,7 +130,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> Map<Integer, PCollectionView<?>> sideInputTagMapping, Collection<PCollectionView<?>> sideInputs, PipelineOptions options) { - this.doFn = doFn; + this.oldDoFn = oldDoFn; this.mainOutputTag = mainOutputTag; this.sideOutputTags = sideOutputTags; this.sideInputTagMapping = sideInputTagMapping; @@ -148,21 +151,43 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> setChainingStrategy(ChainingStrategy.ALWAYS); } + public DoFnOperator( + DoFn<InputT, FnOutputT> doFn, + TypeInformation<WindowedValue<InputT>> inputType, + TupleTag<FnOutputT> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + OutputManagerFactory<OutputT> outputManagerFactory, + WindowingStrategy<?, ?> windowingStrategy, + Map<Integer, PCollectionView<?>> sideInputTagMapping, + Collection<PCollectionView<?>> sideInputs, + PipelineOptions options) { + this( + DoFnAdapters.toOldDoFn(doFn), + inputType, + mainOutputTag, + sideOutputTags, + outputManagerFactory, + windowingStrategy, + sideInputTagMapping, + sideInputs, + options); + } + protected ExecutionContext.StepContext createStepContext() { return new StepContext(); } // allow overriding this in WindowDoFnOperator because this one dynamically creates // the DoFn - protected OldDoFn<InputT, FnOutputT> getDoFn() { - return doFn; + protected OldDoFn<InputT, FnOutputT> getOldDoFn() { + return oldDoFn; } @Override public void open() throws Exception { super.open(); - this.doFn = getDoFn(); + this.oldDoFn = getOldDoFn(); currentInputWatermark = Long.MIN_VALUE; currentOutputWatermark = currentInputWatermark; @@ -220,7 +245,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault( serializedOptions.getPipelineOptions(), - doFn, + oldDoFn, sideInputReader, outputManagerFactory.create(output), mainOutputTag, @@ -232,13 +257,13 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> pushbackDoFnRunner = PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); - doFn.setup(); + oldDoFn.setup(); } @Override public void close() throws Exception { super.close(); - doFn.teardown(); + oldDoFn.teardown(); } protected final long getPushbackWatermarkHold() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index f2d7f1c..9cea529 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -106,7 +106,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> PipelineOptions options, Coder<K> keyCoder) { super( - null, + (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) null, inputType, mainOutputTag, sideOutputTags, @@ -124,7 +124,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> } @Override - protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() { + protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getOldDoFn() { StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() { @Override public StateInternals<K> stateInternalsForKey(K key) { @@ -138,10 +138,10 @@ public class WindowDoFnOperator<K, InputT, OutputT> // has the window type as generic parameter while WindowingStrategy is almost always // untyped. @SuppressWarnings("unchecked") - OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn = + OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> oldDoFn = GroupAlsoByWindowViaWindowSetDoFn.create( windowingStrategy, stateInternalsFactory, (SystemReduceFn) systemReduceFn); - return doFn; + return oldDoFn; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index e44a705..4c97cc7 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -99,7 +98,7 @@ public class PipelineOptionsTest { @Test(expected = Exception.class) public void parDoBaseClassPipelineOptionsNullTest() { DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>( - DoFnAdapters.toOldDoFn(new TestDoFn()), + new TestDoFn(), TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}), new TupleTag<>("main-output"), Collections.<TupleTag<?>>emptyList(), @@ -118,7 +117,7 @@ public class PipelineOptionsTest { public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>( - DoFnAdapters.toOldDoFn(new TestDoFn()), + new TestDoFn(), TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}), new TupleTag<>("main-output"), Collections.<TupleTag<?>>emptyList(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 65e244a..113802d 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -35,8 +35,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PCollectionViewTesting; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -97,7 +95,7 @@ public class DoFnOperatorTest { TupleTag<String> outputTag = new TupleTag<>("main-output"); DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( - DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()), + new IdentityDoFn<String>(), coderTypeInfo, outputTag, Collections.<TupleTag<?>>emptyList(), @@ -141,7 +139,7 @@ public class DoFnOperatorTest { .build(); DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>( - DoFnAdapters.toOldDoFn(new MultiOutputDoFn(sideOutput1, sideOutput2)), + new MultiOutputDoFn(sideOutput1, sideOutput2), coderTypeInfo, mainOutput, ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2), @@ -201,7 +199,7 @@ public class DoFnOperatorTest { .build(); DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( - DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()), + new IdentityDoFn<String>(), coderTypeInfo, outputTag, Collections.<TupleTag<?>>emptyList(),