This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c40868fe2fc550ae97c7b2d9308dd8b58b20edab
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Thu Jan 11 10:52:01 2018 +0100

    Allow overriding DoFnRunners in subclasses of Flink DoFnOperator
---
 .../wrappers/streaming/DoFnOperator.java           | 69 +++++++++++-----------
 .../wrappers/streaming/WindowDoFnOperator.java     | 18 ++++++
 2 files changed, 52 insertions(+), 35 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8ccbd8f..2e7f741 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -218,6 +218,39 @@ public class DoFnOperator<InputT, OutputT>
     return doFn;
   }
 
+  // allow overriding this, for example SplittableDoFnOperator will not create 
a
+  // stateful DoFn runner because ProcessFn, which is used for executing a 
Splittable DoFn
+  // doesn't play by the normal DoFn rules and WindowDoFnOperator uses 
LateDataDroppingDoFnRunner
+  protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(
+      DoFnRunner<InputT, OutputT> wrappedRunner) {
+
+    if (keyCoder != null) {
+      StatefulDoFnRunner.CleanupTimer cleanupTimer =
+          new StatefulDoFnRunner.TimeInternalsCleanupTimer(
+              timerInternals, windowingStrategy);
+
+      // we don't know the window type
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+          new StatefulDoFnRunner.StateInternalsStateCleaner<>(
+              doFn, keyedStateInternals, windowCoder);
+
+
+      return DoFnRunners.defaultStatefulDoFnRunner(
+          doFn,
+          wrappedRunner,
+          windowingStrategy,
+          cleanupTimer,
+          stateCleaner);
+
+    } else {
+      return doFnRunner;
+    }
+  }
+
   @Override
   public void setup(
       StreamTask<?, ?> containingTask,
@@ -304,41 +337,7 @@ public class DoFnOperator<InputT, OutputT>
         stepContext,
         windowingStrategy);
 
-    if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
-      // When the doFn is this, we know it came from WindowDoFnOperator and
-      //   InputT = KeyedWorkItem<K, V>
-      //   OutputT = KV<K, V>
-      //
-      // for some K, V
-
-
-      doFnRunner = DoFnRunners.lateDataDroppingRunner(
-          (DoFnRunner) doFnRunner,
-          timerInternals,
-          windowingStrategy);
-    } else if (keyCoder != null) {
-      // It is a stateful DoFn
-
-      StatefulDoFnRunner.CleanupTimer cleanupTimer =
-          new StatefulDoFnRunner.TimeInternalsCleanupTimer(
-              stepContext.timerInternals(), windowingStrategy);
-
-      // we don't know the window type
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
-
-      @SuppressWarnings({"unchecked", "rawtypes"})
-      StatefulDoFnRunner.StateCleaner<?> stateCleaner =
-          new StatefulDoFnRunner.StateInternalsStateCleaner<>(
-              doFn, stepContext.stateInternals(), windowCoder);
-
-      doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
-          doFn,
-          doFnRunner,
-          windowingStrategy,
-          cleanupTimer,
-          stateCleaner);
-    }
+    doFnRunner = createWrappingDoFnRunner(doFnRunner);
 
     if (options.getEnableMetrics()) {
       doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, 
getRuntimeContext());
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 7a04238..8447ade 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
@@ -79,6 +81,22 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   }
 
   @Override
+  protected DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> 
createWrappingDoFnRunner(
+      DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> wrappedRunner) {
+    // When the doFn is this, we know it came from WindowDoFnOperator and
+    //   InputT = KeyedWorkItem<K, V>
+    //   OutputT = KV<K, V>
+    //
+    // for some K, V
+
+
+    return DoFnRunners.lateDataDroppingRunner(
+        (DoFnRunner) doFnRunner,
+        timerInternals,
+        windowingStrategy);
+  }
+
+  @Override
   protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
     StateInternalsFactory<K> stateInternalsFactory =
         key -> {

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.

Reply via email to