Repository: beam Updated Branches: refs/heads/master 88f78fa2f -> aebd3a4c5
Remove the FnOutputT parameter from DoFnOperator Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e8f26085 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e8f26085 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e8f26085 Branch: refs/heads/master Commit: e8f26085e889f8f618c0961a5458cbc42b432c01 Parents: b0601fd Author: JingsongLi <[email protected]> Authored: Tue Jun 6 17:31:09 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Jun 6 14:33:36 2017 +0200 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 10 +++++----- .../wrappers/streaming/DoFnOperator.java | 20 ++++++++++---------- .../streaming/SplittableDoFnOperator.java | 12 ++++++------ .../wrappers/streaming/WindowDoFnOperator.java | 2 +- .../beam/runners/flink/PipelineOptionsTest.java | 6 +++--- .../flink/streaming/DoFnOperatorTest.java | 11 +++++------ 6 files changed, 30 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index d8c3049..2a7c5d6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -332,7 +332,7 @@ class FlinkStreamingTransformTranslators { static class ParDoTranslationHelper { interface DoFnOperatorFactory<InputT, OutputT> { - DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator( + DoFnOperator<InputT, OutputT> createDoFnOperator( DoFn<InputT, OutputT> doFn, String stepName, List<PCollectionView<?>> sideInputs, @@ -395,7 +395,7 @@ class FlinkStreamingTransformTranslators { context.getCoder((PCollection<OutputT>) outputs.get(mainOutputTag))); if (sideInputs.isEmpty()) { - DoFnOperator<InputT, OutputT, OutputT> doFnOperator = + DoFnOperator<InputT, OutputT> doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -416,7 +416,7 @@ class FlinkStreamingTransformTranslators { Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs = transformSideInputs(sideInputs, context); - DoFnOperator<InputT, OutputT, OutputT> doFnOperator = + DoFnOperator<InputT, OutputT> doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, context.getCurrentTransform().getFullName(), @@ -493,7 +493,7 @@ class FlinkStreamingTransformTranslators { context, new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() { @Override - public DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator( + public DoFnOperator<InputT, OutputT> createDoFnOperator( DoFn<InputT, OutputT> doFn, String stepName, List<PCollectionView<?>> sideInputs, @@ -547,7 +547,7 @@ class FlinkStreamingTransformTranslators { @Override public DoFnOperator< KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, - OutputT, OutputT> createDoFnOperator( + OutputT> createDoFnOperator( DoFn< KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn, http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8c27ed9..350f323 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -94,21 +94,21 @@ import org.joda.time.Instant; * Flink operator for executing {@link DoFn DoFns}. * * @param <InputT> the input type of the {@link DoFn} - * @param <FnOutputT> the output type of the {@link DoFn} + * @param <OutputT> the output type of the {@link DoFn} * @param <OutputT> the output type of the operator, this can be different from the fn output * type when we have additional tagged outputs */ -public class DoFnOperator<InputT, FnOutputT, OutputT> +public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<WindowedValue<OutputT>> implements OneInputStreamOperator<WindowedValue<InputT>, WindowedValue<OutputT>>, TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, WindowedValue<OutputT>>, KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> { - protected DoFn<InputT, FnOutputT> doFn; + protected DoFn<InputT, OutputT> doFn; protected final SerializedPipelineOptions serializedOptions; - protected final TupleTag<FnOutputT> mainOutputTag; + protected final TupleTag<OutputT> mainOutputTag; protected final List<TupleTag<?>> additionalOutputTags; protected final Collection<PCollectionView<?>> sideInputs; @@ -118,8 +118,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected final OutputManagerFactory<OutputT> outputManagerFactory; - protected transient DoFnRunner<InputT, FnOutputT> doFnRunner; - protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner; + protected transient DoFnRunner<InputT, OutputT> doFnRunner; + protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner; protected transient SideInputHandler sideInputHandler; @@ -127,7 +127,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected transient DoFnRunners.OutputManager outputManager; - private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker; + private transient DoFnInvoker<InputT, OutputT> doFnInvoker; protected transient long currentInputWatermark; @@ -156,10 +156,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> private transient Optional<Long> pushedBackWatermark; public DoFnOperator( - DoFn<InputT, FnOutputT> doFn, + DoFn<InputT, OutputT> doFn, String stepName, Coder<WindowedValue<InputT>> inputCoder, - TupleTag<FnOutputT> mainOutputTag, + TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, @@ -192,7 +192,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> // allow overriding this in WindowDoFnOperator because this one dynamically creates // the DoFn - protected DoFn<InputT, FnOutputT> getDoFn() { + protected DoFn<InputT, OutputT> getDoFn() { return doFn; } http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 4ac2ff5..5d08eba 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -55,19 +55,19 @@ import org.joda.time.Instant; * the {@code @ProcessElement} method of a splittable {@link DoFn}. */ public class SplittableDoFnOperator< - InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> extends DoFnOperator< - KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> { + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> { private transient ScheduledExecutorService executorService; public SplittableDoFnOperator( - DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn, + DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn, String stepName, Coder< WindowedValue< KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder, - TupleTag<FnOutputT> mainOutputTag, + TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, @@ -120,10 +120,10 @@ public class SplittableDoFnOperator< new OutputAndTimeBoundedSplittableProcessElementInvoker<>( doFn, serializedOptions.getPipelineOptions(), - new OutputWindowedValue<FnOutputT>() { + new OutputWindowedValue<OutputT>() { @Override public void outputWindowedValue( - FnOutputT output, + OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index ea578b9..78d585e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.operators.InternalTimer; * Flink operator for executing window {@link DoFn DoFns}. */ public class WindowDoFnOperator<K, InputT, OutputT> - extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, KV<K, OutputT>> { + extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>> { private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn; http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index bc0b1c2..d0281ec 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -136,7 +136,7 @@ public class PipelineOptionsTest { @Test(expected = Exception.class) public void parDoBaseClassPipelineOptionsNullTest() { - DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( + DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new TestDoFn(), "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), @@ -157,7 +157,7 @@ public class PipelineOptionsTest { @Test public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { - DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( + DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new TestDoFn(), "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), @@ -173,7 +173,7 @@ public class PipelineOptionsTest { final byte[] serialized = SerializationUtils.serialize(doFnOperator); @SuppressWarnings("unchecked") - DoFnOperator<Object, Object, Object> deserialized = SerializationUtils.deserialize(serialized); + DoFnOperator<Object, Object> deserialized = SerializationUtils.deserialize(serialized); TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of( new TypeHint<WindowedValue<Object>>() {}); http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 132242e..ad9d236 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -111,7 +111,7 @@ public class DoFnOperatorTest { TupleTag<String> outputTag = new TupleTag<>("main-output"); - DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( + DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new IdentityDoFn<String>(), "stepName", windowedValueCoder, @@ -155,7 +155,7 @@ public class DoFnOperatorTest { .put(additionalOutput2, new OutputTag<String>(additionalOutput2.getId()){}) .build(); - DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( + DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new MultiOutputDoFn(additionalOutput1, additionalOutput2), "stepName", windowedValueCoder, @@ -223,7 +223,7 @@ public class DoFnOperatorTest { TupleTag<String> outputTag = new TupleTag<>("main-output"); - DoFnOperator<Integer, String, String> doFnOperator = new DoFnOperator<>( + DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>( fn, "stepName", windowedValueCoder, @@ -335,8 +335,7 @@ public class DoFnOperatorTest { TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output"); - DoFnOperator< - KV<String, Integer>, KV<String, Integer>, KV<String, Integer>> doFnOperator = + DoFnOperator<KV<String, Integer>, KV<String, Integer>> doFnOperator = new DoFnOperator<>( fn, "stepName", @@ -433,7 +432,7 @@ public class DoFnOperatorTest { keyCoder = StringUtf8Coder.of(); } - DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( + DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new IdentityDoFn<String>(), "stepName", windowedValueCoder,
