Reject stateful DoFn in ApexRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e85cea78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e85cea78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e85cea78 Branch: refs/heads/gearpump-runner Commit: e85cea78253d2f316a18d95d65aabc1176448841 Parents: f8b6bb7 Author: Kenneth Knowles <[email protected]> Authored: Tue Nov 15 21:33:01 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Mon Nov 21 21:32:47 2016 -0800 ---------------------------------------------------------------------- runners/apex/pom.xml | 1 + .../translation/ParDoBoundMultiTranslator.java | 67 +++++++++++++------- .../apex/translation/ParDoBoundTranslator.java | 46 +++++++++----- 3 files changed, 74 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85cea78/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 5478b24..d0b0fdf 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -185,6 +185,7 @@ </goals> <configuration> <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> + <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> <dependenciesToScan> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85cea78/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java index 7c91b91..fed5f4b 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java @@ -23,17 +23,17 @@ import static com.google.common.base.Preconditions.checkArgument; import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.OutputPort; import com.google.common.collect.Maps; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - +import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.values.PCollection; @@ -53,20 +53,35 @@ class ParDoBoundMultiTranslator<InputT, OutputT> @Override public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { - OldDoFn<InputT, OutputT> doFn = transform.getFn(); + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + ApexRunner.class.getSimpleName())); + } + OldDoFn<InputT, OutputT> oldDoFn = transform.getFn(); PCollectionTuple output = context.getOutput(); PCollection<InputT> input = context.getInput(); List<PCollectionView<?>> sideInputs = transform.getSideInputs(); Coder<InputT> inputCoder = input.getCoder(); - WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); + WindowedValueCoder<InputT> wvInputCoder = + FullWindowedValueCoder.of( + inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); - ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>( - context.getPipelineOptions(), - doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), - context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder, - context.<Void>stateInternalsFactory() - ); + ApexParDoOperator<InputT, OutputT> operator = + new ApexParDoOperator<>( + context.getPipelineOptions(), + oldDoFn, + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + context.<PCollection<?>>getInput().getWindowingStrategy(), + sideInputs, + wvInputCoder, + context.<Void>stateInternalsFactory()); Map<TupleTag<?>, PCollection<?>> outputs = output.getAll(); Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size()); @@ -91,7 +106,9 @@ class ParDoBoundMultiTranslator<InputT, OutputT> } } - static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs, + static void addSideInputs( + ApexParDoOperator<?, ?> operator, + List<PCollectionView<?>> sideInputs, TranslationContext context) { Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1}; if (sideInputs.size() > sideInputPorts.length) { @@ -105,8 +122,8 @@ class ParDoBoundMultiTranslator<InputT, OutputT> } } - private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs, - TranslationContext context) { + private static PCollection<?> unionSideInputs( + List<PCollectionView<?>> sideInputs, TranslationContext context) { checkArgument(sideInputs.size() > 1, "requires multiple side inputs"); // flatten and assign union tag List<PCollection<Object>> sourceCollections = new ArrayList<>(); @@ -115,13 +132,16 @@ class ParDoBoundMultiTranslator<InputT, OutputT> for (int i = 0; i < sideInputs.size(); i++) { PCollectionView<?> sideInput = sideInputs.get(i); PCollection<?> sideInputCollection = context.getViewInput(sideInput); - if (!sideInputCollection.getWindowingStrategy().equals( - firstSideInput.getWindowingStrategy())) { + if (!sideInputCollection + .getWindowingStrategy() + .equals(firstSideInput.getWindowingStrategy())) { // TODO: check how to handle this in stream codec //String msg = "Multiple side inputs with different window strategies."; //throw new UnsupportedOperationException(msg); - LOG.warn("Side inputs union with different windowing strategies {} {}", - firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy()); + LOG.warn( + "Side inputs union with different windowing strategies {} {}", + firstSideInput.getWindowingStrategy(), + sideInputCollection.getWindowingStrategy()); } if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) { String msg = "Multiple side inputs with different coders."; @@ -131,12 +151,11 @@ class ParDoBoundMultiTranslator<InputT, OutputT> unionTags.put(sideInputCollection, i); } - PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection( - firstSideInput, firstSideInput.getCoder()); - FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection, - context); + PCollection<Object> resultCollection = + FlattenPCollectionTranslator.intermediateCollection( + firstSideInput, firstSideInput.getCoder()); + FlattenPCollectionTranslator.flattenCollections( + sourceCollections, unionTags, resultCollection, context); return resultCollection; - } - } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85cea78/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java index c1ebbd5..7a918a7 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java @@ -19,12 +19,13 @@ package org.apache.beam.runners.apex.translation; import java.util.List; - +import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.values.PCollection; @@ -32,33 +33,46 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -/** - * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. - */ -class ParDoBoundTranslator<InputT, OutputT> implements - TransformTranslator<ParDo.Bound<InputT, OutputT>> { +/** {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. */ +class ParDoBoundTranslator<InputT, OutputT> + implements TransformTranslator<ParDo.Bound<InputT, OutputT>> { private static final long serialVersionUID = 1L; @Override public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { - OldDoFn<InputT, OutputT> doFn = transform.getFn(); + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + ApexRunner.class.getSimpleName())); + } + OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn(); PCollection<OutputT> output = context.getOutput(); PCollection<InputT> input = context.getInput(); List<PCollectionView<?>> sideInputs = transform.getSideInputs(); Coder<InputT> inputCoder = input.getCoder(); - WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); + WindowedValueCoder<InputT> wvInputCoder = + FullWindowedValueCoder.of( + inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); - ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>( - context.getPipelineOptions(), - doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/, - output.getWindowingStrategy(), sideInputs, wvInputCoder, - context.<Void>stateInternalsFactory() - ); + ApexParDoOperator<InputT, OutputT> operator = + new ApexParDoOperator<>( + context.getPipelineOptions(), + oldDoFn, + new TupleTag<OutputT>(), + TupleTagList.empty().getAll() /*sideOutputTags*/, + output.getWindowingStrategy(), + sideInputs, + wvInputCoder, + context.<Void>stateInternalsFactory()); context.addOperator(operator, operator.output); context.addStream(context.getInput(), operator.input); if (!sideInputs.isEmpty()) { - ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context); + ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context); } } }
