Reject stateful DoFn in FlinkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c2d5da7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c2d5da7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c2d5da7 Branch: refs/heads/gearpump-runner Commit: 9c2d5da7c659a2603d37c492ff44f4a9cda387fe Parents: 7949b70 Author: Kenneth Knowles <[email protected]> Authored: Tue Nov 15 21:33:28 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Tue Nov 22 10:52:39 2016 -0800 ---------------------------------------------------------------------- runners/flink/runner/pom.xml | 1 + .../FlinkBatchTransformTranslators.java | 34 +++++++++++++++++--- .../FlinkStreamingTransformTranslators.java | 25 +++++++++++++- 3 files changed, 55 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c2d5da7/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index adcb3de..c060c25 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -53,6 +53,7 @@ </goals> <configuration> <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> + <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> <dependenciesToScan> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c2d5da7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 935a9ac..474d4e3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction; import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction; @@ -46,6 +47,7 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; @@ -54,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -487,11 +490,23 @@ class FlinkBatchTransformTranslators { @Override public void translateNode( ParDo.Bound<InputT, OutputT> transform, + FlinkBatchTranslationContext context) { + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + FlinkRunner.class.getSimpleName())); + } + DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - final OldDoFn<InputT, OutputT> doFn = transform.getFn(); + final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn(); TypeInformation<WindowedValue<OutputT>> typeInformation = context.getTypeInfo(context.getOutput(transform)); @@ -507,7 +522,7 @@ class FlinkBatchTransformTranslators { FlinkDoFnFunction<InputT, OutputT> doFnWrapper = new FlinkDoFnFunction<>( - doFn, + oldDoFn, context.getOutput(transform).getWindowingStrategy(), sideInputStrategies, context.getPipelineOptions()); @@ -533,10 +548,21 @@ class FlinkBatchTransformTranslators { public void translateNode( ParDo.BoundMulti<InputT, OutputT> transform, FlinkBatchTranslationContext context) { + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + FlinkRunner.class.getSimpleName())); + } + DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - final OldDoFn<InputT, OutputT> doFn = transform.getFn(); + final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn(); Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll(); @@ -584,7 +610,7 @@ class FlinkBatchTransformTranslators { @SuppressWarnings("unchecked") FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper = new FlinkMultiOutputDoFnFunction( - doFn, + oldDoFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c2d5da7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 687e9c8..40dfbb9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -21,7 +21,6 @@ package org.apache.beam.runners.flink.translation; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -51,6 +50,7 @@ import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; @@ -58,6 +58,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -311,6 +312,17 @@ public class FlinkStreamingTransformTranslators { ParDo.Bound<InputT, OutputT> transform, FlinkStreamingTranslationContext context) { + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + FlinkRunner.class.getSimpleName())); + } + WindowingStrategy<?, ?> windowingStrategy = context.getOutput(transform).getWindowingStrategy(); @@ -460,6 +472,17 @@ public class FlinkStreamingTransformTranslators { ParDo.BoundMulti<InputT, OutputT> transform, FlinkStreamingTranslationContext context) { + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + FlinkRunner.class.getSimpleName())); + } + // we assume that the transformation does not change the windowing strategy. WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy();
