Refactor SparkProcessContext more cleanly into single- and multi-output versions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8243fcdc Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8243fcdc Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8243fcdc Branch: refs/heads/python-sdk Commit: 8243fcdc4e80838589622bd2f0bbe51350da8c8a Parents: 1fb1f7b Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Nov 11 16:57:27 2016 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Nov 17 13:18:36 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/core/OutputWindowedValue.java | 1 - .../runners/core/ReduceFnContextFactory.java | 1 - .../runners/spark/translation/DoFnFunction.java | 9 +++- .../spark/translation/MultiDoFnFunction.java | 17 +----- .../spark/translation/SparkProcessContext.java | 57 ++++++++++---------- 5 files changed, 39 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java index 08a0e81..86eeb33 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core; import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index 668ef47..d43fb8e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.Timers; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.MergingStateAccessor; import org.apache.beam.sdk.util.state.ReadableState; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index 4dfbee6..fa08c5b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.FlatMapFunction; - +import org.joda.time.Instant; /** @@ -93,6 +93,13 @@ public class DoFnFunction<InputT, OutputT> } @Override + public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) { + throw new UnsupportedOperationException( + "sideOutput is an unsupported operation for doFunctions, use a " + + "MultiDoFunction instead."); + } + + @Override public Accumulator<NamedAggregators> getAccumulator() { return accum; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 1168381..d015b08 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -103,21 +103,8 @@ public class MultiDoFnFunction<InputT, OutputT> } @Override - public synchronized <T> void sideOutput(TupleTag<T> tag, T t) { - sideOutputWithTimestamp(tag, t, windowedValue != null ? windowedValue.getTimestamp() : null); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, - final T t, - final Instant timestamp) { - if (windowedValue == null) { - // this is start/finishBundle. - outputs.put(tupleTag, noElementWindowedValue(t, timestamp, windowFn)); - } else { - outputs.put(tupleTag, WindowedValue.of(t, timestamp, windowedValue.getWindows(), - windowedValue.getPane())); - } + public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) { + outputs.put(tag, output); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8243fcdc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index f3152ba..afbc824 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -127,34 +127,10 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> } @Override - public void output(OutputT output) { - outputWithTimestamp(output, windowedValue != null ? windowedValue.getTimestamp() : null); - } - - public abstract void output(WindowedValue<OutputT> output); - - @Override - public <T> void sideOutput(TupleTag<T> tupleTag, T t) { - String message = "sideOutput is an unsupported operation for doFunctions, use a " - + "MultiDoFunction instead."; - LOG.warn(message); - throw new UnsupportedOperationException(message); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) { - String message = - "sideOutputWithTimestamp is an unsupported operation for doFunctions, use a " - + "MultiDoFunction instead."; - LOG.warn(message); - throw new UnsupportedOperationException(message); - } - - @Override public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT> createAggregatorInternal( - String named, - Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combineFn) { + String named, + Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combineFn) { return mRuntimeContext.createAggregator(getAccumulator(), named, combineFn); } @@ -166,6 +142,11 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> } @Override + public void output(OutputT output) { + outputWithTimestamp(output, windowedValue != null ? windowedValue.getTimestamp() : null); + } + + @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { if (windowedValue == null) { // this is start/finishBundle. @@ -176,6 +157,26 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> } } + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + sideOutputWithTimestamp( + tag, output, windowedValue != null ? windowedValue.getTimestamp() : null); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + if (windowedValue == null) { + // this is start/finishBundle. + sideOutput(tag, noElementWindowedValue(output, timestamp, windowFn)); + } else { + sideOutput(tag, WindowedValue.of(output, timestamp, windowedValue.getWindows(), + windowedValue.getPane())); + } + } + + public abstract void output(WindowedValue<OutputT> output); + public abstract <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output); + static <T, W extends BoundedWindow> WindowedValue<T> noElementWindowedValue( final T output, final Instant timestamp, WindowFn<Object, W> windowFn) { WindowFn<Object, W>.AssignContext assignContext = @@ -248,8 +249,8 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> @Override public <SideOutputT> void sideOutputWindowedValue( TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane) { - throw new UnsupportedOperationException(); + Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) { + sideOutput(tag, WindowedValue.of(output, timestamp, windows, paneInfo)); } @Override