This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit c40868fe2fc550ae97c7b2d9308dd8b58b20edab Author: Aljoscha Krettek <[email protected]> AuthorDate: Thu Jan 11 10:52:01 2018 +0100 Allow overriding DoFnRunners in subclasses of Flink DoFnOperator --- .../wrappers/streaming/DoFnOperator.java | 69 +++++++++++----------- .../wrappers/streaming/WindowDoFnOperator.java | 18 ++++++ 2 files changed, 52 insertions(+), 35 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8ccbd8f..2e7f741 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -218,6 +218,39 @@ public class DoFnOperator<InputT, OutputT> return doFn; } + // allow overriding this, for example SplittableDoFnOperator will not create a + // stateful DoFn runner because ProcessFn, which is used for executing a Splittable DoFn + // doesn't play by the normal DoFn rules and WindowDoFnOperator uses LateDataDroppingDoFnRunner + protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner( + DoFnRunner<InputT, OutputT> wrappedRunner) { + + if (keyCoder != null) { + StatefulDoFnRunner.CleanupTimer cleanupTimer = + new StatefulDoFnRunner.TimeInternalsCleanupTimer( + timerInternals, windowingStrategy); + + // we don't know the window type + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + StatefulDoFnRunner.StateCleaner<?> stateCleaner = + new StatefulDoFnRunner.StateInternalsStateCleaner<>( + doFn, keyedStateInternals, windowCoder); + + + return DoFnRunners.defaultStatefulDoFnRunner( + doFn, + wrappedRunner, + windowingStrategy, + cleanupTimer, + stateCleaner); + + } else { + return doFnRunner; + } + } + @Override public void setup( StreamTask<?, ?> containingTask, @@ -304,41 +337,7 @@ public class DoFnOperator<InputT, OutputT> stepContext, windowingStrategy); - if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) { - // When the doFn is this, we know it came from WindowDoFnOperator and - // InputT = KeyedWorkItem<K, V> - // OutputT = KV<K, V> - // - // for some K, V - - - doFnRunner = DoFnRunners.lateDataDroppingRunner( - (DoFnRunner) doFnRunner, - timerInternals, - windowingStrategy); - } else if (keyCoder != null) { - // It is a stateful DoFn - - StatefulDoFnRunner.CleanupTimer cleanupTimer = - new StatefulDoFnRunner.TimeInternalsCleanupTimer( - stepContext.timerInternals(), windowingStrategy); - - // we don't know the window type - @SuppressWarnings({"unchecked", "rawtypes"}) - Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); - - @SuppressWarnings({"unchecked", "rawtypes"}) - StatefulDoFnRunner.StateCleaner<?> stateCleaner = - new StatefulDoFnRunner.StateInternalsStateCleaner<>( - doFn, stepContext.stateInternals(), windowCoder); - - doFnRunner = DoFnRunners.defaultStatefulDoFnRunner( - doFn, - doFnRunner, - windowingStrategy, - cleanupTimer, - stateCleaner); - } + doFnRunner = createWrappingDoFnRunner(doFnRunner); if (options.getEnableMetrics()) { doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 7a04238..8447ade 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -23,6 +23,8 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; @@ -79,6 +81,22 @@ public class WindowDoFnOperator<K, InputT, OutputT> } @Override + protected DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> createWrappingDoFnRunner( + DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> wrappedRunner) { + // When the doFn is this, we know it came from WindowDoFnOperator and + // InputT = KeyedWorkItem<K, V> + // OutputT = KV<K, V> + // + // for some K, V + + + return DoFnRunners.lateDataDroppingRunner( + (DoFnRunner) doFnRunner, + timerInternals, + windowingStrategy); + } + + @Override protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() { StateInternalsFactory<K> stateInternalsFactory = key -> { -- To stop receiving notification emails like this one, please contact [email protected].
