Repository: incubator-beam Updated Branches: refs/heads/master d99eec15d -> b742a2c09
Revert "Allow stateful DoFn in DataflowRunner" This reverts commit 42bb15d2df28b99b6788010450f41f2932095771. The Dataflow service has introduced a bug that was masked by various test disabling. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/df97fe48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/df97fe48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/df97fe48 Branch: refs/heads/master Commit: df97fe4836a669b11ddeb37ef467bbfee1d803ea Parents: d99eec1 Author: Kenneth Knowles <k...@google.com> Authored: Tue Dec 13 16:36:42 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Dec 13 16:37:52 2016 -0800 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 22 +++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/df97fe48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 8048df9..a56690c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -77,7 +77,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; @@ -956,6 +955,7 @@ public class DataflowPipelineTranslator { private <InputT, OutputT> void translateMultiHelper( ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { + rejectStatefulDoFn(transform.getNewFn()); context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); @@ -985,6 +985,7 @@ public class DataflowPipelineTranslator { private <InputT, OutputT> void translateSingleHelper( ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { + rejectStatefulDoFn(transform.getNewFn()); context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); @@ -1032,6 +1033,18 @@ public class DataflowPipelineTranslator { registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); } + private static void rejectStatefulDoFn(DoFn<?, ?> doFn) { + if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + DataflowRunner.class.getSimpleName())); + } + } + private static void translateInputs( PCollection<?> input, List<PCollectionView<?>> sideInputs, @@ -1063,9 +1076,6 @@ public class DataflowPipelineTranslator { TranslationContext context, long mainOutput, Map<Long, TupleTag<?>> outputMap) { - - DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); - context.addInput(PropertyNames.USER_FN, fn.getClass().getName()); context.addInput( PropertyNames.SERIALIZED_FN, @@ -1073,10 +1083,6 @@ public class DataflowPipelineTranslator { serializeToByteArray( DoFnInfo.forFn( fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap)))); - - if (signature.isStateful()) { - context.addInput(PropertyNames.USES_KEYED_STATE, "true"); - } } private static BiMap<Long, TupleTag<?>> translateOutputs(