This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit d15979f2cf0d6bc47d049a6ea157d9d7b2b97848 Author: Aljoscha Krettek <[email protected]> AuthorDate: Wed Jan 10 06:48:49 2018 +0100 [BEAM-2140] Don't use StatefulDoFnRunner when running SDF in FlinkRunner --- .../translation/wrappers/streaming/SplittableDoFnOperator.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 44be5f3..e088b07 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; @@ -88,6 +89,15 @@ public class SplittableDoFnOperator< } @Override + protected DoFnRunner< + KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> createWrappingDoFnRunner( + DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> wrappedRunner) { + // don't wrap in anything because we don't need state cleanup because ProcessFn does + // all that + return wrappedRunner; + } + + @Override public void open() throws Exception { super.open(); -- To stop receiving notification emails like this one, please contact [email protected].
