Refactor FlinkProcessContext more cleanly into single- and multi-output versions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fb1f7be Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fb1f7be Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fb1f7be Branch: refs/heads/master Commit: 1fb1f7bebeacf54e66f606f373d68c14483a444c Parents: 24cae56 Author: Eugene Kirpichov <[email protected]> Authored: Fri Nov 11 16:37:42 2016 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Thu Nov 17 13:18:36 2016 -0800 ---------------------------------------------------------------------- .../functions/FlinkDoFnFunction.java | 12 +- .../FlinkMergingNonShuffleReduceFunction.java | 14 +- .../FlinkMergingPartialReduceFunction.java | 14 +- .../functions/FlinkMergingReduceFunction.java | 12 +- .../functions/FlinkMultiOutputDoFnFunction.java | 14 +- .../FlinkMultiOutputProcessContext.java | 94 +---- .../functions/FlinkPartialReduceFunction.java | 14 +- .../functions/FlinkProcessContext.java | 343 ------------------- .../functions/FlinkProcessContextBase.java | 285 +++++++++++++++ .../functions/FlinkReduceFunction.java | 14 +- .../FlinkSingleOutputProcessContext.java | 70 ++++ 11 files changed, 421 insertions(+), 465 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 798a23c..dc0ef0f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -64,20 +64,20 @@ public class FlinkDoFnFunction<InputT, OutputT> Iterable<WindowedValue<InputT>> values, Collector<WindowedValue<OutputT>> out) throws Exception { - FlinkProcessContext<InputT, OutputT> context = new FlinkProcessContext<>( + FlinkSingleOutputProcessContext<InputT, OutputT> context = new FlinkSingleOutputProcessContext<>( serializedOptions.getPipelineOptions(), getRuntimeContext(), doFn, windowingStrategy, - out, - sideInputs); + sideInputs, out + ); this.doFn.startBundle(context); if (!requiresWindowAccess || hasSideInputs) { // we don't need to explode the windows for (WindowedValue<InputT> value : values) { - context = context.forWindowedValue(value); + context.setWindowedValue(value); doFn.processElement(context); } } else { @@ -86,7 +86,7 @@ public class FlinkDoFnFunction<InputT, OutputT> // is in only one window for (WindowedValue<InputT> value : values) { for (WindowedValue<InputT> explodedValue : value.explodeWindows()) { - context = context.forWindowedValue(explodedValue); + context.setWindowedValue(explodedValue); doFn.processElement(context); } } @@ -94,7 +94,7 @@ public class FlinkDoFnFunction<InputT, OutputT> // set the windowed value to null so that the special logic for outputting // in startBundle/finishBundle kicks in - context = context.forWindowedValue(null); + context.setWindowedValue(null); this.doFn.finishBundle(context); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index dbaab17..a4284f8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -92,14 +92,14 @@ public class FlinkMergingNonShuffleReduceFunction< Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - FlinkProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext = - new FlinkProcessContext<>( + FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext = + new FlinkSingleOutputProcessContext<>( serializedOptions.getPipelineOptions(), getRuntimeContext(), doFn, windowingStrategy, - out, - sideInputs); + sideInputs, out + ); PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner = PerKeyCombineFnRunners.create(combineFn); @@ -141,7 +141,7 @@ public class FlinkMergingNonShuffleReduceFunction< IntervalWindow currentWindow = (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); InputT firstValue = currentValue.getValue().getValue(); - processContext = processContext.forWindowedValue(currentValue); + processContext.setWindowedValue(currentValue); AccumT accumulator = combineFnRunner.createAccumulator(key, processContext); accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext); @@ -157,7 +157,7 @@ public class FlinkMergingNonShuffleReduceFunction< // continue accumulating and merge windows InputT value = nextValue.getValue().getValue(); - processContext = processContext.forWindowedValue(nextValue); + processContext.setWindowedValue(nextValue); accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); windowTimestamp = outputTimeFn.combine( @@ -175,7 +175,7 @@ public class FlinkMergingNonShuffleReduceFunction< currentWindow = nextWindow; InputT value = nextValue.getValue().getValue(); - processContext = processContext.forWindowedValue(nextValue); + processContext.setWindowedValue(nextValue); accumulator = combineFnRunner.createAccumulator(key, processContext); accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java index bc09bdf..30d3326 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java @@ -60,14 +60,14 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { - FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext = - new FlinkProcessContext<>( + FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext = + new FlinkSingleOutputProcessContext<>( serializedOptions.getPipelineOptions(), getRuntimeContext(), doFn, windowingStrategy, - out, - sideInputs); + sideInputs, out + ); PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = PerKeyCombineFnRunners.create(combineFn); @@ -109,7 +109,7 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte IntervalWindow currentWindow = (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); InputT firstValue = currentValue.getValue().getValue(); - processContext = processContext.forWindowedValue(currentValue); + processContext.setWindowedValue(currentValue); AccumT accumulator = combineFnRunner.createAccumulator(key, processContext); accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext); @@ -125,7 +125,7 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte // continue accumulating and merge windows InputT value = nextValue.getValue().getValue(); - processContext = processContext.forWindowedValue(nextValue); + processContext.setWindowedValue(nextValue); accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); windowTimestamp = outputTimeFn.combine( @@ -143,7 +143,7 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte currentWindow = nextWindow; InputT value = nextValue.getValue().getValue(); - processContext = processContext.forWindowedValue(nextValue); + processContext.setWindowedValue(nextValue); accumulator = combineFnRunner.createAccumulator(key, processContext); accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java index 4050f47..29dc1e3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java @@ -62,14 +62,14 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi Iterable<WindowedValue<KV<K, AccumT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext = - new FlinkProcessContext<>( + FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext = + new FlinkSingleOutputProcessContext<>( serializedOptions.getPipelineOptions(), getRuntimeContext(), doFn, windowingStrategy, - out, - sideInputs); + sideInputs, out + ); PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = PerKeyCombineFnRunners.create(combineFn); @@ -127,7 +127,7 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi if (nextWindow.equals(currentWindow)) { // continue accumulating and merge windows - processContext = processContext.forWindowedValue(nextValue); + processContext.setWindowedValue(nextValue); accumulator = combineFnRunner.mergeAccumulators( key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext); @@ -143,7 +143,7 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi windowTimestamps.clear(); - processContext = processContext.forWindowedValue(nextValue); + processContext.setWindowedValue(nextValue); currentWindow = nextWindow; accumulator = nextValue.getValue().getValue(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index 810609e..7be4bb4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -74,22 +74,22 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT> Iterable<WindowedValue<InputT>> values, Collector<WindowedValue<RawUnionValue>> out) throws Exception { - FlinkProcessContext<InputT, OutputT> context = + FlinkMultiOutputProcessContext<InputT, OutputT> context = new FlinkMultiOutputProcessContext<>( serializedOptions.getPipelineOptions(), getRuntimeContext(), doFn, windowingStrategy, - out, - outputMap, - sideInputs); + sideInputs, out, + outputMap + ); this.doFn.startBundle(context); if (!requiresWindowAccess || hasSideInputs) { // we don't need to explode the windows for (WindowedValue<InputT> value : values) { - context = context.forWindowedValue(value); + context.setWindowedValue(value); doFn.processElement(context); } } else { @@ -98,7 +98,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT> // is in only one window for (WindowedValue<InputT> value : values) { for (WindowedValue<InputT> explodedValue : value.explodeWindows()) { - context = context.forWindowedValue(value); + context.setWindowedValue(value); doFn.processElement(context); } } @@ -106,7 +106,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT> // set the windowed value to null so that the special logic for outputting // in startBundle/finishBundle kicks in - context = context.forWindowedValue(null); + context.setWindowedValue(null); this.doFn.finishBundle(context); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java index 153a2d7..a3d2b18 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java @@ -33,95 +33,35 @@ import org.apache.flink.util.Collector; import org.joda.time.Instant; /** - * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports - * side outputs. + * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports side + * outputs. */ class FlinkMultiOutputProcessContext<InputT, OutputT> - extends FlinkProcessContext<InputT, OutputT> { + extends FlinkProcessContextBase<InputT, OutputT> { - // we need a different Collector from the base class private final Collector<WindowedValue<RawUnionValue>> collector; - private final Map<TupleTag<?>, Integer> outputMap; - FlinkMultiOutputProcessContext( PipelineOptions pipelineOptions, RuntimeContext runtimeContext, OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, Collector<WindowedValue<RawUnionValue>> collector, - Map<TupleTag<?>, Integer> outputMap, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) { - super( - pipelineOptions, - runtimeContext, - doFn, - windowingStrategy, - new Collector<WindowedValue<OutputT>>() { - @Override - public void collect(WindowedValue<OutputT> outputTWindowedValue) { - - } - - @Override - public void close() { - - } - }, - sideInputs); - + Map<TupleTag<?>, Integer> outputMap) { + super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs); this.collector = collector; this.outputMap = outputMap; } @Override - public FlinkProcessContext<InputT, OutputT> forWindowedValue( - WindowedValue<InputT> windowedValue) { - this.windowedValue = windowedValue; - return this; - } - - @Override - public void outputWithTimestamp(OutputT value, Instant timestamp) { - if (windowedValue == null) { - // we are in startBundle() or finishBundle() - - try { - Collection windows = windowingStrategy.getWindowFn().assignWindows( - new FlinkNoElementAssignContext( - windowingStrategy.getWindowFn(), - value, - timestamp)); - - collector.collect( - WindowedValue.of( - new RawUnionValue(0, value), - timestamp != null ? timestamp : new Instant(Long.MIN_VALUE), - windows, - PaneInfo.NO_FIRING)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - collector.collect( - WindowedValue.of( - new RawUnionValue(0, value), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPane())); - } - } - - @Override protected void outputWithTimestampAndWindow( OutputT value, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - collector.collect( - WindowedValue.of( - new RawUnionValue(0, value), timestamp, windows, pane)); + collector.collect(WindowedValue.of(new RawUnionValue(0, value), timestamp, windows, pane)); } @Override @@ -142,19 +82,24 @@ class FlinkMultiOutputProcessContext<InputT, OutputT> throw new IllegalArgumentException("Unknown side output tag: " + tag); } + outputUnionValue(value, timestamp, new RawUnionValue(index, value)); + } + + private <T> void outputUnionValue(T value, Instant timestamp, RawUnionValue unionValue) { if (windowedValue == null) { // we are in startBundle() or finishBundle() try { - Collection windows = windowingStrategy.getWindowFn().assignWindows( - new FlinkNoElementAssignContext( - windowingStrategy.getWindowFn(), - value, - timestamp)); + Collection<? extends BoundedWindow> windows = + windowingStrategy + .getWindowFn() + .assignWindows( + new FlinkNoElementAssignContext( + windowingStrategy.getWindowFn(), value, timestamp)); collector.collect( WindowedValue.of( - new RawUnionValue(index, value), + unionValue, timestamp != null ? timestamp : new Instant(Long.MIN_VALUE), windows, PaneInfo.NO_FIRING)); @@ -164,11 +109,10 @@ class FlinkMultiOutputProcessContext<InputT, OutputT> } else { collector.collect( WindowedValue.of( - new RawUnionValue(index, value), + unionValue, windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPane())); } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index fa2ce4d..3ea456a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -88,14 +88,14 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { - FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext = - new FlinkProcessContext<>( + FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext = + new FlinkSingleOutputProcessContext<>( serializedOptions.getPipelineOptions(), getRuntimeContext(), doFn, windowingStrategy, - out, - sideInputs); + sideInputs, out + ); PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = PerKeyCombineFnRunners.create(combineFn); @@ -132,7 +132,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind K key = currentValue.getValue().getKey(); BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); InputT firstValue = currentValue.getValue().getValue(); - processContext = processContext.forWindowedValue(currentValue); + processContext.setWindowedValue(currentValue); AccumT accumulator = combineFnRunner.createAccumulator(key, processContext); accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext); @@ -147,7 +147,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind if (nextWindow.equals(currentWindow)) { // continue accumulating InputT value = nextValue.getValue().getValue(); - processContext = processContext.forWindowedValue(nextValue); + processContext.setWindowedValue(nextValue); accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); windowTimestamp = outputTimeFn.combine( @@ -165,7 +165,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind currentWindow = nextWindow; InputT value = nextValue.getValue().getValue(); - processContext = processContext.forWindowedValue(nextValue); + processContext.setWindowedValue(nextValue); accumulator = combineFnRunner.createAccumulator(key, processContext); accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java deleted file mode 100644 index 1b28a70..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * {@link OldDoFn.ProcessContext} for our Flink Wrappers. - */ -class FlinkProcessContext<InputT, OutputT> - extends OldDoFn<InputT, OutputT>.ProcessContext { - - private final PipelineOptions pipelineOptions; - private final RuntimeContext runtimeContext; - private Collector<WindowedValue<OutputT>> collector; - private final boolean requiresWindowAccess; - - protected WindowedValue<InputT> windowedValue; - - protected WindowingStrategy<?, ?> windowingStrategy; - - private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - - FlinkProcessContext( - PipelineOptions pipelineOptions, - RuntimeContext runtimeContext, - OldDoFn<InputT, OutputT> doFn, - WindowingStrategy<?, ?> windowingStrategy, - Collector<WindowedValue<OutputT>> collector, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) { - doFn.super(); - checkNotNull(pipelineOptions); - checkNotNull(runtimeContext); - checkNotNull(doFn); - checkNotNull(collector); - - this.pipelineOptions = pipelineOptions; - this.runtimeContext = runtimeContext; - this.collector = collector; - this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; - this.windowingStrategy = windowingStrategy; - this.sideInputs = sideInputs; - - super.setupDelegateAggregators(); - } - - FlinkProcessContext( - PipelineOptions pipelineOptions, - RuntimeContext runtimeContext, - OldDoFn<InputT, OutputT> doFn, - WindowingStrategy<?, ?> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) { - doFn.super(); - checkNotNull(pipelineOptions); - checkNotNull(runtimeContext); - checkNotNull(doFn); - - this.pipelineOptions = pipelineOptions; - this.runtimeContext = runtimeContext; - this.collector = null; - this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; - this.windowingStrategy = windowingStrategy; - this.sideInputs = sideInputs; - - super.setupDelegateAggregators(); - } - - public FlinkProcessContext<InputT, OutputT> forOutput( - Collector<WindowedValue<OutputT>> collector) { - this.collector = collector; - - // for now, returns ourselves, to be easy on the GC - return this; - } - - - - public FlinkProcessContext<InputT, OutputT> forWindowedValue( - WindowedValue<InputT> windowedValue) { - this.windowedValue = windowedValue; - - // for now, returns ourselves, to be easy on the GC - return this; - } - - @Override - public InputT element() { - return this.windowedValue.getValue(); - } - - - @Override - public Instant timestamp() { - return windowedValue.getTimestamp(); - } - - @Override - public BoundedWindow window() { - if (!requiresWindowAccess) { - throw new UnsupportedOperationException( - "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess."); - } - return Iterables.getOnlyElement(windowedValue.getWindows()); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - - return new WindowingInternals<InputT, OutputT>() { - - @Override - public StateInternals stateInternals() { - throw new UnsupportedOperationException(); - } - - @Override - public void outputWindowedValue( - OutputT value, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - // TODO: Refactor this (get rid of duplication, move things around w.r.t. - // FlinkMultiOutputProcessContext) - collector.collect(WindowedValue.of(value, timestamp, windows, pane)); - outputWithTimestampAndWindow(value, timestamp, windows, pane); - } - - @Override - public <SideOutputT> void sideOutputWindowedValue( - TupleTag<SideOutputT> tag, - SideOutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - // TODO: Implement this - throw new UnsupportedOperationException(); - } - - @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException(); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, - Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public <ViewT> ViewT sideInput( - PCollectionView<ViewT> view, - BoundedWindow mainInputWindow) { - - checkNotNull(view, "View passed to sideInput cannot be null"); - checkNotNull( - sideInputs.get(view), - "Side input for " + view + " not available."); - - // get the side input strategy for mapping the window - WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view); - - BoundedWindow sideInputWindow = - windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow); - - Map<BoundedWindow, ViewT> sideInputs = - runtimeContext.getBroadcastVariableWithInitializer( - view.getTagInternal().getId(), new SideInputInitializer<>(view)); - return sideInputs.get(sideInputWindow); - } - }; - } - - @Override - public PipelineOptions getPipelineOptions() { - return pipelineOptions; - } - - @Override - public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) { - checkNotNull(view, "View passed to sideInput cannot be null"); - checkNotNull(sideInputs.get(view), "Side input for " + view + " not available."); - Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator(); - BoundedWindow window; - if (!windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is not in any windows"); - } else { - window = windowIter.next(); - if (windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is in multiple windows"); - } - } - - // get the side input strategy for mapping the window - WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view); - - BoundedWindow sideInputWindow = - windowingStrategy.getWindowFn().getSideInputWindow(window); - - Map<BoundedWindow, ViewT> sideInputs = - runtimeContext.getBroadcastVariableWithInitializer( - view.getTagInternal().getId(), new SideInputInitializer<>(view)); - ViewT result = sideInputs.get(sideInputWindow); - if (result == null) { - result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); - } - return result; - } - - @Override - public void output(OutputT value) { - if (windowedValue != null) { - outputWithTimestamp(value, windowedValue.getTimestamp()); - } else { - outputWithTimestamp(value, null); - } - } - - @Override - public void outputWithTimestamp(OutputT value, Instant timestamp) { - if (windowedValue == null) { - // we are in startBundle() or finishBundle() - - try { - Collection windows = windowingStrategy.getWindowFn().assignWindows( - new FlinkNoElementAssignContext( - windowingStrategy.getWindowFn(), - value, - timestamp)); - - collector.collect( - WindowedValue.of( - value, - timestamp != null ? timestamp : new Instant(Long.MIN_VALUE), - windows, - PaneInfo.NO_FIRING)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - collector.collect( - WindowedValue.of( - value, - timestamp, - windowedValue.getWindows(), - windowedValue.getPane())); - } - } - - protected void outputWithTimestampAndWindow( - OutputT value, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - collector.collect( - WindowedValue.of( - value, timestamp, windows, pane)); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - throw new UnsupportedOperationException(); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> - createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = - new SerializableFnAggregatorWrapper<>(combiner); - Accumulator<?, ?> existingAccum = - (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name); - if (existingAccum != null) { - return wrapper; - } else { - runtimeContext.addAccumulator(name, wrapper); - } - return wrapper; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java new file mode 100644 index 0000000..b814015 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * {@link OldDoFn.ProcessContext} for our Flink Wrappers. + */ +abstract class FlinkProcessContextBase<InputT, OutputT> + extends OldDoFn<InputT, OutputT>.ProcessContext { + + private final PipelineOptions pipelineOptions; + private final RuntimeContext runtimeContext; + private final boolean requiresWindowAccess; + protected final WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy; + private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + + protected WindowedValue<InputT> windowedValue; + + FlinkProcessContextBase( + PipelineOptions pipelineOptions, + RuntimeContext runtimeContext, + OldDoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ? extends BoundedWindow>> sideInputs) { + doFn.super(); + checkNotNull(pipelineOptions); + checkNotNull(runtimeContext); + checkNotNull(doFn); + + this.pipelineOptions = pipelineOptions; + this.runtimeContext = runtimeContext; + this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + + super.setupDelegateAggregators(); + } + + public void setWindowedValue(WindowedValue<InputT> windowedValue) { + this.windowedValue = windowedValue; + } + + @Override + public InputT element() { + return this.windowedValue.getValue(); + } + + + @Override + public Instant timestamp() { + return windowedValue.getTimestamp(); + } + + @Override + public BoundedWindow window() { + if (!requiresWindowAccess) { + throw new UnsupportedOperationException( + "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess."); + } + return Iterables.getOnlyElement(windowedValue.getWindows()); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + + return new WindowingInternals<InputT, OutputT>() { + + @Override + public StateInternals stateInternals() { + throw new UnsupportedOperationException(); + } + + @Override + public void outputWindowedValue( + OutputT value, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputWithTimestampAndWindow(value, timestamp, windows, pane); + } + + @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException(); + } + + @Override + public TimerInternals timerInternals() { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<? extends BoundedWindow> windows() { + return windowedValue.getWindows(); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, + Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public <ViewT> ViewT sideInput( + PCollectionView<ViewT> view, + BoundedWindow mainInputWindow) { + + checkNotNull(view, "View passed to sideInput cannot be null"); + checkNotNull( + sideInputs.get(view), + "Side input for " + view + " not available."); + + // get the side input strategy for mapping the window + WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view); + + BoundedWindow sideInputWindow = + windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow); + + Map<BoundedWindow, ViewT> sideInputs = + runtimeContext.getBroadcastVariableWithInitializer( + view.getTagInternal().getId(), new SideInputInitializer<>(view)); + return sideInputs.get(sideInputWindow); + } + }; + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) { + checkNotNull(view, "View passed to sideInput cannot be null"); + checkNotNull(sideInputs.get(view), "Side input for " + view + " not available."); + Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator(); + BoundedWindow window; + if (!windowIter.hasNext()) { + throw new IllegalStateException( + "sideInput called when main input element is not in any windows"); + } else { + window = windowIter.next(); + if (windowIter.hasNext()) { + throw new IllegalStateException( + "sideInput called when main input element is in multiple windows"); + } + } + + // get the side input strategy for mapping the window + WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view); + + BoundedWindow sideInputWindow = + windowingStrategy.getWindowFn().getSideInputWindow(window); + + Map<BoundedWindow, ViewT> sideInputs = + runtimeContext.getBroadcastVariableWithInitializer( + view.getTagInternal().getId(), new SideInputInitializer<>(view)); + ViewT result = sideInputs.get(sideInputWindow); + if (result == null) { + result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); + } + return result; + } + + @Override + public void output(OutputT value) { + if (windowedValue != null) { + outputWithTimestamp(value, windowedValue.getTimestamp()); + } else { + outputWithTimestamp(value, null); + } + } + + @Override + public final void outputWithTimestamp(OutputT value, Instant timestamp) { + if (windowedValue == null) { + // we are in startBundle() or finishBundle() + + try { + Collection windows = windowingStrategy.getWindowFn().assignWindows( + new FlinkNoElementAssignContext( + windowingStrategy.getWindowFn(), + value, + timestamp)); + + outputWithTimestampAndWindow( + value, + timestamp != null ? timestamp : new Instant(Long.MIN_VALUE), + windows, + PaneInfo.NO_FIRING); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + outputWithTimestampAndWindow( + value, timestamp, windowedValue.getWindows(), windowedValue.getPane()); + } + } + + protected abstract void outputWithTimestampAndWindow( + OutputT value, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane); + + @Override + public abstract <T> void sideOutput(TupleTag<T> tag, T output); + + @Override + public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp); + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = + new SerializableFnAggregatorWrapper<>(combiner); + Accumulator<?, ?> existingAccum = + (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name); + if (existingAccum != null) { + return wrapper; + } else { + runtimeContext.addAccumulator(name, wrapper); + } + return wrapper; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index c9b24b4..ab0c471 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -92,14 +92,14 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> Iterable<WindowedValue<KV<K, AccumT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext = - new FlinkProcessContext<>( + FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext = + new FlinkSingleOutputProcessContext<>( serializedOptions.getPipelineOptions(), getRuntimeContext(), doFn, windowingStrategy, - out, - sideInputs); + sideInputs, out + ); PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = PerKeyCombineFnRunners.create(combineFn); @@ -150,14 +150,14 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> if (nextWindow.equals(currentWindow)) { // continue accumulating - processContext = processContext.forWindowedValue(nextValue); + processContext.setWindowedValue(nextValue); accumulator = combineFnRunner.mergeAccumulators( key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext); windowTimestamps.add(nextValue.getTimestamp()); } else { // emit the value that we currently have - processContext = processContext.forWindowedValue(currentValue); + processContext.setWindowedValue(currentValue); out.collect( WindowedValue.of( KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), @@ -179,7 +179,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> // if at the end of the iteration we have a change in windows // the ProcessContext will not have been updated - processContext = processContext.forWindowedValue(currentValue); + processContext.setWindowedValue(currentValue); // emit the final accumulator out.collect( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java new file mode 100644 index 0000000..d67f6fd --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import java.util.Collection; +import java.util.Map; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** {@link OldDoFn.ProcessContext} for {@link FlinkDoFnFunction} with a single main output. */ +class FlinkSingleOutputProcessContext<InputT, OutputT> + extends FlinkProcessContextBase<InputT, OutputT> { + + private final Collector<WindowedValue<OutputT>> collector; + + FlinkSingleOutputProcessContext( + PipelineOptions pipelineOptions, + RuntimeContext runtimeContext, + OldDoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + Collector<WindowedValue<OutputT>> collector) { + super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs); + this.collector = collector; + } + + @Override + protected void outputWithTimestampAndWindow( + OutputT value, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + collector.collect(WindowedValue.of(value, timestamp, windows, pane)); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T value) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) { + throw new UnsupportedOperationException(); + } +}
