Liberates ReduceFnRunner from WindowingInternals
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24cae56a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24cae56a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24cae56a Branch: refs/heads/master Commit: 24cae56ad3a7b25b9e2114907f1d069a243f87dd Parents: 1543ea9 Author: Eugene Kirpichov <[email protected]> Authored: Thu Nov 10 18:40:53 2016 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Thu Nov 17 13:18:36 2016 -0800 ---------------------------------------------------------------------- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 3 +- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 5 +- .../beam/runners/core/OutputWindowedValue.java | 46 ++++++++++++ .../runners/core/ReduceFnContextFactory.java | 71 ++++++++++++++++--- .../beam/runners/core/ReduceFnRunner.java | 39 ++--------- .../beam/runners/core/SideInputAccess.java | 31 +++++++++ .../beam/runners/core/SimpleDoFnRunner.java | 12 ++++ .../beam/runners/core/SimpleOldDoFnRunner.java | 10 +++ .../core/WindowingInternalsAdapters.java | 65 +++++++++++++++++ .../beam/runners/core/ReduceFnTester.java | 67 ++++++------------ .../GroupAlsoByWindowEvaluatorFactory.java | 73 ++++++-------------- .../functions/FlinkProcessContext.java | 13 ++++ .../spark/translation/SparkProcessContext.java | 7 ++ .../apache/beam/sdk/transforms/DoFnTester.java | 16 +++-- .../beam/sdk/util/WindowingInternals.java | 10 +++ .../beam/sdk/util/state/StateContexts.java | 56 --------------- 16 files changed, 321 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index dde883c..bcc52d3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -87,7 +87,8 @@ public class GroupAlsoByWindowViaWindowSetDoFn< TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())), stateInternals, timerInternals, - c.windowingInternals(), + WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()), + WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()), droppedDueToClosedWindow, reduceFn, c.getPipelineOptions()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index f1a6ded..45c0eda 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -69,14 +69,15 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = - new ReduceFnRunner<K, InputT, OutputT, W>( + new ReduceFnRunner<>( key, strategy, ExecutableTriggerStateMachine.create( TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger())), stateInternals, timerInternals, - c.windowingInternals(), + WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()), + WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()), droppedDueToClosedWindow, reduceFn, c.getPipelineOptions()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java new file mode 100644 index 0000000..08a0e81 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputWindowedValue.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.Collection; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * An object that can output a value with all of its windowing information to the main output or + * a side output. + */ +public interface OutputWindowedValue<OutputT> { + /** Outputs a value with windowing information to the main output. */ + void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane); + + /** Outputs a value with windowing information to a side output. */ + <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index 500c6e7..668ef47 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; /** @@ -62,20 +63,25 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { private final StateInternals<K> stateInternals; private final ActiveWindowSet<W> activeWindows; private final TimerInternals timerInternals; - private final WindowingInternals<?, ?> windowingInternals; + private final SideInputAccess sideInputAccess; private final PipelineOptions options; - ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn, - WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals, - ActiveWindowSet<W> activeWindows, TimerInternals timerInternals, - WindowingInternals<?, ?> windowingInternals, PipelineOptions options) { + ReduceFnContextFactory( + K key, + ReduceFn<K, InputT, OutputT, W> reduceFn, + WindowingStrategy<?, W> windowingStrategy, + StateInternals<K> stateInternals, + ActiveWindowSet<W> activeWindows, + TimerInternals timerInternals, + SideInputAccess sideInputAccess, + PipelineOptions options) { this.key = key; this.reduceFn = reduceFn; this.windowingStrategy = windowingStrategy; this.stateInternals = stateInternals; this.activeWindows = activeWindows; this.timerInternals = timerInternals; - this.windowingInternals = windowingInternals; + this.sideInputAccess = sideInputAccess; this.options = options; } @@ -90,7 +96,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) { return new StateAccessorImpl<K, W>( activeWindows, windowingStrategy.getWindowFn().windowCoder(), - stateInternals, StateContexts.createFromComponents(options, windowingInternals, window), + stateInternals, stateContextFromComponents(options, sideInputAccess, window), style); } @@ -217,7 +223,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged, W mergeResult) { super(activeWindows, windowCoder, stateInternals, - StateContexts.windowOnly(mergeResult), style); + stateContextForWindowOnly(mergeResult), style); this.activeToBeMerged = activeToBeMerged; } @@ -262,7 +268,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, StateInternals<K> stateInternals, W window) { super(activeWindows, windowCoder, stateInternals, - StateContexts.windowOnly(window), StateStyle.RENAMED); + stateContextForWindowOnly(window), StateStyle.RENAMED); } Collection<W> mergingWindows() { @@ -496,4 +502,51 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { return timers; } } + + private static <W extends BoundedWindow> StateContext<W> stateContextFromComponents( + @Nullable final PipelineOptions options, + final SideInputAccess sideInputAccess, + final W window) { + if (options == null) { + return StateContexts.nullContext(); + } else { + return new StateContext<W>() { + + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return sideInputAccess.sideInput(view, window); + } + + @Override + public W window() { + return window; + } + }; + } + } + + /** Returns a {@link StateContext} that only contains the state window. */ + private static <W extends BoundedWindow> StateContext<W> stateContextForWindowOnly( + final W window) { + return new StateContext<W>() { + @Override + public PipelineOptions getPipelineOptions() { + throw new IllegalArgumentException( + "cannot call getPipelineOptions() in a window only context"); + } + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new IllegalArgumentException("cannot call sideInput() in a window only context"); + } + @Override + public W window() { + return window; + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 66fb27c..023a77a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -55,7 +55,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.util.state.ReadableState; @@ -217,7 +216,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme ExecutableTriggerStateMachine triggerStateMachine, StateInternals<K> stateInternals, TimerInternals timerInternals, - WindowingInternals<?, KV<K, OutputT>> windowingInternals, + OutputWindowedValue<KV<K, OutputT>> outputter, + SideInputAccess sideInputAccess, Aggregator<Long, Long> droppedDueToClosedWindow, ReduceFn<K, InputT, OutputT, W> reduceFn, PipelineOptions options) { @@ -225,7 +225,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme this.timerInternals = timerInternals; this.paneInfoTracker = new PaneInfoTracker(timerInternals); this.stateInternals = stateInternals; - this.outputter = new OutputViaWindowingInternals<>(windowingInternals); + this.outputter = outputter; this.droppedDueToClosedWindow = droppedDueToClosedWindow; this.reduceFn = reduceFn; @@ -240,8 +240,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme this.activeWindows = createActiveWindowSet(); this.contextFactory = - new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy, - stateInternals, this.activeWindows, timerInternals, windowingInternals, options); + new ReduceFnContextFactory<>(key, reduceFn, this.windowingStrategy, + stateInternals, this.activeWindows, timerInternals, sideInputAccess, options); this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy); this.triggerRunner = @@ -965,33 +965,4 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); } } - - /** - * An object that can output a value with all of its windowing information. This is a deliberately - * restricted subinterface of {@link WindowingInternals} to express how it is used here. - */ - private interface OutputWindowedValue<OutputT> { - void outputWindowedValue(OutputT output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane); - } - - private static class OutputViaWindowingInternals<OutputT> - implements OutputWindowedValue<OutputT> { - - private final WindowingInternals<?, OutputT> windowingInternals; - - public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) { - this.windowingInternals = windowingInternals; - } - - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - windowingInternals.outputWindowedValue(output, timestamp, windows, pane); - } - - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java new file mode 100644 index 0000000..7d64566 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Allows accessing the side inputs for a particular main input window. + */ +public interface SideInputAccess { + /** + * Return the value of the side input for the window of a main input element. + */ + <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index c046d11..c0f3a02 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -66,6 +66,10 @@ import org.joda.time.format.PeriodFormat; /** * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in. * + * <p>Also, if the {@link DoFn} observes the window of the element, then {@link SimpleDoFnRunner} + * explodes windows of the input {@link WindowedValue} and calls {@link DoFn.ProcessElement} for + * each window individually. + * * @param <InputT> the type of the {@link DoFn} (main) input elements * @param <OutputT> the type of the {@link DoFn} (main) output elements */ @@ -627,6 +631,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out PaneInfo pane) {} @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) {} + + @Override public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { return context.sideInput(view, mainInputWindow); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 1298fc8..8efc27b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -472,6 +472,16 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT } @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + context.sideOutputWindowedValue(tag, output, timestamp, windows, pane); + } + + @Override public Collection<? extends BoundedWindow> windows() { return windowedValue.getWindows(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java new file mode 100644 index 0000000..1b47e2b --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.Collection; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * Adapters from {@link WindowingInternals} to {@link SideInputAccess} and {@link + * OutputWindowedValue}. + */ +public class WindowingInternalsAdapters { + static SideInputAccess sideInputAccess(final WindowingInternals<?, ?> windowingInternals) { + return new SideInputAccess() { + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + return windowingInternals.sideInput(view, mainInputWindow); + } + }; + } + + public static <OutputT> OutputWindowedValue<OutputT> outputWindowedValue( + final WindowingInternals<?, OutputT> windowingInternals) { + return new OutputWindowedValue<OutputT>() { + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + windowingInternals.outputWindowedValue(output, timestamp, windows, pane); + } + + @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + windowingInternals.sideOutputWindowedValue(tag, output, timestamp, windows, pane); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index f5ab8ea..5f8424e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -75,7 +74,6 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.util.state.InMemoryTimerInternals; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; @@ -106,7 +104,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { private final TestTimerInternals timerInternals = new TestTimerInternals(); private final WindowFn<Object, W> windowFn; - private final TestWindowingInternals windowingInternals; + private final TestOutputWindowedValue testOutputter; + private final TestSideInputAccess testSideInputAccess; private final Coder<OutputT> outputCoder; private final WindowingStrategy<Object, W> objectStrategy; private final ExecutableTriggerStateMachine executableTriggerStateMachine; @@ -291,7 +290,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { this.objectStrategy = objectStrategy; this.reduceFn = reduceFn; this.windowFn = objectStrategy.getWindowFn(); - this.windowingInternals = new TestWindowingInternals(sideInputReader); + this.testOutputter = new TestOutputWindowedValue(); + this.testSideInputAccess = new TestSideInputAccess(sideInputReader); this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine); this.outputCoder = outputCoder; this.options = options; @@ -313,7 +313,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { executableTriggerStateMachine, stateInternals, timerInternals, - windowingInternals, + testOutputter, + testSideInputAccess, droppedDueToClosedWindow, reduceFn, options); @@ -418,7 +419,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { * How many panes do we have in the output? */ public int getOutputSize() { - return windowingInternals.outputs.size(); + return testOutputter.outputs.size(); } /** @@ -426,7 +427,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { */ public List<WindowedValue<OutputT>> extractOutput() { ImmutableList<WindowedValue<OutputT>> result = - FluentIterable.from(windowingInternals.outputs) + FluentIterable.from(testOutputter.outputs) .transform(new Function<WindowedValue<KV<String, OutputT>>, WindowedValue<OutputT>>() { @Override public WindowedValue<OutputT> apply(WindowedValue<KV<String, OutputT>> input) { @@ -434,7 +435,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { } }) .toList(); - windowingInternals.outputs.clear(); + testOutputter.outputs.clear(); return result; } @@ -517,18 +518,12 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { /** * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output * elements. - */ - private class TestWindowingInternals implements WindowingInternals<InputT, KV<String, OutputT>> { + */private class TestOutputWindowedValue implements OutputWindowedValue<KV<String, OutputT>> { private List<WindowedValue<KV<String, OutputT>>> outputs = new ArrayList<>(); - private SideInputReader sideInputReader; - - private TestWindowingInternals(SideInputReader sideInputReader) { - this.sideInputReader = sideInputReader; - } @Override public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane) { + Collection<? extends BoundedWindow> windows, PaneInfo pane) { // Copy the output value (using coders) before capturing it. KV<String, OutputT> copy = SerializableUtils.<KV<String, OutputT>>ensureSerializableByCoder( KvCoder.of(StringUtf8Coder.of(), outputCoder), output, "outputForWindow"); @@ -537,37 +532,21 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { } @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException( - "Testing triggers should not use timers from WindowingInternals."); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - throw new UnsupportedOperationException( - "Testing triggers should not use windows from WindowingInternals."); - } - - @Override - public PaneInfo pane() { - throw new UnsupportedOperationException( - "Testing triggers should not use pane from WindowingInternals."); + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException(); } + } - @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new UnsupportedOperationException( - "Testing triggers should not use writePCollectionViewData from WindowingInternals."); - } + private class TestSideInputAccess implements SideInputAccess { + private SideInputReader sideInputReader; - @Override - public StateInternals<Object> stateInternals() { - // Safe for testing only - @SuppressWarnings({"unchecked", "rawtypes"}) - TestInMemoryStateInternals<Object> untypedStateInternals = - (TestInMemoryStateInternals) stateInternals; - return untypedStateInternals; + private TestSideInputAccess(SideInputReader sideInputReader) { + this.sideInputReader = sideInputReader; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index e5c5e4b..0e8adba 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -29,7 +29,9 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; +import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; +import org.apache.beam.runners.core.SideInputAccess; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; @@ -173,7 +175,14 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())), stateInternals, timerInternals, - new DirectWindowingInternals<>(bundle), + new OutputWindowedValueToBundle<>(bundle), + new SideInputAccess() { + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new UnsupportedOperationException( + "GroupAlsoByWindow must not have side inputs"); + } + }, droppedDueToClosedWindow, reduceFn, evaluationContext.getPipelineOptions()); @@ -243,26 +252,15 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { } } - private static class DirectWindowingInternals<K, V> - implements WindowingInternals<Object, KV<K, Iterable<V>>> { + private static class OutputWindowedValueToBundle<K, V> + implements OutputWindowedValue<KV<K, Iterable<V>>> { private final UncommittedBundle<KV<K, Iterable<V>>> bundle; - private DirectWindowingInternals( - UncommittedBundle<KV<K, Iterable<V>>> bundle) { + private OutputWindowedValueToBundle(UncommittedBundle<KV<K, Iterable<V>>> bundle) { this.bundle = bundle; } @Override - public StateInternals<?> stateInternals() { - throw new UnsupportedOperationException( - String.format( - "%s should use the %s it is provided rather than the contents of %s", - ReduceFnRunner.class.getSimpleName(), - StateInternals.class.getSimpleName(), - getClass().getSimpleName())); - } - - @Override public void outputWindowedValue( KV<K, Iterable<V>> output, Instant timestamp, @@ -272,44 +270,13 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException( - String.format( - "%s should use the %s it is provided rather than the contents of %s", - ReduceFnRunner.class.getSimpleName(), - TimerInternals.class.getSimpleName(), - getClass().getSimpleName())); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - throw new IllegalArgumentException( - String.format( - "%s should not access Windows via %s.windows(); " - + "it should instead inspect the window of the input elements", - GroupAlsoByWindowEvaluator.class.getSimpleName(), - WindowingInternals.class.getSimpleName())); - } - - @Override - public PaneInfo pane() { - throw new IllegalArgumentException( - String.format( - "%s should not access Windows via %s.windows(); " - + "it should instead inspect the window of the input elements", - GroupAlsoByWindowEvaluator.class.getSimpleName(), - WindowingInternals.class.getSimpleName())); - } - - @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - throw new UnsupportedOperationException(); + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException("Can't output to side outputs from a ReduceFn"); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java index baf97cb..1b28a70 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java @@ -166,11 +166,24 @@ class FlinkProcessContext<InputT, OutputT> Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + // TODO: Refactor this (get rid of duplication, move things around w.r.t. + // FlinkMultiOutputProcessContext) collector.collect(WindowedValue.of(value, timestamp, windows, pane)); outputWithTimestampAndWindow(value, timestamp, windows, pane); } @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + // TODO: Implement this + throw new UnsupportedOperationException(); + } + + @Override public TimerInternals timerInternals() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 99cd522..f3152ba 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -246,6 +246,13 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> } @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp, + Collection<? extends BoundedWindow> windows, PaneInfo pane) { + throw new UnsupportedOperationException(); + } + + @Override public StateInternals stateInternals() { //TODO: implement state internals. // This is a temporary placeholder to get the TfIdfTest http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 7995719..dd7d894 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -278,7 +278,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { startBundle(); } try { - fn.processElement(createProcessContext(fn, element)); + fn.processElement(createProcessContext(element)); } catch (UserCodeException e) { unwrapUserCodeException(e); } @@ -606,9 +606,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } } - private TestProcessContext createProcessContext( - OldDoFn<InputT, OutputT> fn, - TimestampedValue<InputT> elem) { + private TestProcessContext createProcessContext(TimestampedValue<InputT> elem) { WindowedValue<InputT> windowedValue = WindowedValue.timestampedValueInGlobalWindow( elem.getValue(), elem.getTimestamp()); @@ -678,6 +676,16 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + context.noteOutput(tag, WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override public TimerInternals timerInternals() { return timerInternals; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java index 016276c..ab3c600 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java @@ -52,6 +52,16 @@ public interface WindowingInternals<InputT, OutputT> { Collection<? extends BoundedWindow> windows, PaneInfo pane); /** + * Output the value to a side output at the specified timestamp in the listed windows. + */ + <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane); + + /** * Return the timer manager provided by the underlying system, or null if Timers need * to be emulated. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24cae56a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java index d0c566d..81121e1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java @@ -17,10 +17,8 @@ */ package org.apache.beam.sdk.util.state; -import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; /** @@ -51,58 +49,4 @@ public class StateContexts { public static <W extends BoundedWindow> StateContext<W> nullContext() { return (StateContext<W>) NULL_CONTEXT; } - - /** - * Returns a {@link StateContext} that only contains the state window. - */ - public static <W extends BoundedWindow> StateContext<W> windowOnly(final W window) { - return new StateContext<W>() { - @Override - public PipelineOptions getPipelineOptions() { - throw new IllegalArgumentException( - "cannot call getPipelineOptions() in a window only context"); - } - @Override - public <T> T sideInput(PCollectionView<T> view) { - throw new IllegalArgumentException("cannot call sideInput() in a window only context"); - } - @Override - public W window() { - return window; - } - }; - } - - /** - * Returns a {@link StateContext} from {@code PipelineOptions}, {@link WindowingInternals}, - * and the state window. - */ - public static <W extends BoundedWindow> StateContext<W> createFromComponents( - @Nullable final PipelineOptions options, - final WindowingInternals<?, ?> windowingInternals, - final W window) { - @SuppressWarnings("unchecked") - StateContext<W> typedNullContext = (StateContext<W>) NULL_CONTEXT; - if (options == null) { - return typedNullContext; - } else { - return new StateContext<W>() { - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return windowingInternals.sideInput(view, window); - } - - @Override - public W window() { - return window; - } - }; - } - } }
