Repository: incubator-beam Updated Branches: refs/heads/master 96f9fce78 -> 0bfa02dd2
[BEAM-1111] Reject timers for ParDo in SparkRunner streaming evaluators Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95e2c53d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95e2c53d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95e2c53d Branch: refs/heads/master Commit: 95e2c53db535952aaf0c335e0d3d27a721c6b55d Parents: 96f9fce Author: Sela <ans...@paypal.com> Authored: Thu Dec 8 20:29:35 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Dec 8 11:54:19 2016 -0800 ---------------------------------------------------------------------- .../spark/translation/TransformTranslator.java | 28 +---------------- .../spark/translation/TranslationUtils.java | 33 ++++++++++++++++++++ .../streaming/StreamingTransformTranslator.java | 6 ++++ 3 files changed, 40 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95e2c53d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 8170366..964eb37 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -23,6 +23,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutput import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; +import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers; import com.google.common.collect.Maps; import java.io.IOException; @@ -32,7 +33,6 @@ import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.beam.runners.core.AssignWindowsDoFn; -import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -58,8 +58,6 @@ import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -228,30 +226,6 @@ public final class TransformTranslator { }; } - private static void rejectStateAndTimers(DoFn<?, ?> doFn) { - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - - if (signature.stateDeclarations().size() > 0) { - throw new UnsupportedOperationException( - String.format( - "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", - DoFn.StateId.class.getSimpleName(), - doFn.getClass().getName(), - DoFn.class.getSimpleName(), - SparkRunner.class.getSimpleName())); - } - - if (signature.timerDeclarations().size() > 0) { - throw new UnsupportedOperationException( - String.format( - "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", - DoFn.TimerId.class.getSimpleName(), - doFn.getClass().getName(), - DoFn.class.getSimpleName(), - SparkRunner.class.getSimpleName())); - } - } - private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() { return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95e2c53d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index 647f8c3..eddc771 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -24,8 +24,12 @@ import java.io.Serializable; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.util.BroadcastHelper; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -211,4 +215,33 @@ public final class TranslationUtils { } } + /** + * Reject state and timers {@link DoFn}. + * + * @param doFn the {@link DoFn} to possibly reject. + */ + public static void rejectStateAndTimers(DoFn<?, ?> doFn) { + DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + + if (signature.stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + SparkRunner.class.getSimpleName())); + } + + if (signature.timerDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", + DoFn.TimerId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + SparkRunner.class.getSimpleName())); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95e2c53d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 85d796a..00df7d4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark.translation.streaming; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers; import com.google.common.collect.Maps; import java.util.ArrayList; @@ -47,6 +48,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; @@ -348,6 +350,8 @@ final class StreamingTransformTranslator { @Override public void evaluate(final ParDo.Bound<InputT, OutputT> transform, final EvaluationContext context) { + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + rejectStateAndTimers(doFn); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context); @@ -380,6 +384,8 @@ final class StreamingTransformTranslator { @Override public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform, final EvaluationContext context) { + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + rejectStateAndTimers(doFn); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context);