Replaces SideInputAccess with SideInputReader Makes WindowingInternals.sideInput take the side input window instead of main input window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90a0d0e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90a0d0e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90a0d0e1 Branch: refs/heads/python-sdk Commit: 90a0d0e13fa0332df805b79b1dc64860d9590217 Parents: 8243fcd Author: Eugene Kirpichov <kirpic...@google.com> Authored: Mon Nov 14 14:48:31 2016 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Nov 17 13:18:36 2016 -0800 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 26 ++++++++++--- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 2 +- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 2 +- .../runners/core/ReduceFnContextFactory.java | 27 +++++++++----- .../beam/runners/core/ReduceFnRunner.java | 5 ++- .../beam/runners/core/SideInputAccess.java | 31 ---------------- .../beam/runners/core/SimpleDoFnRunner.java | 32 +++++++--------- .../beam/runners/core/SimpleOldDoFnRunner.java | 11 +++--- .../core/WindowingInternalsAdapters.java | 21 ++++++++--- .../beam/runners/core/ReduceFnTester.java | 34 +++++------------ .../GroupAlsoByWindowEvaluatorFactory.java | 21 +++++++---- .../functions/FlinkDoFnFunction.java | 15 ++++---- .../functions/FlinkProcessContextBase.java | 9 +---- .../FlinkSingleOutputProcessContext.java | 1 - .../runners/spark/translation/DoFnFunction.java | 5 +-- .../spark/translation/MultiDoFnFunction.java | 6 +-- .../spark/translation/SparkProcessContext.java | 39 ++++++++++++-------- .../apache/beam/sdk/transforms/DoFnTester.java | 2 +- .../beam/sdk/util/WindowingInternals.java | 4 +- 19 files changed, 141 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 8fbfb03..eca4308 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -354,13 +354,26 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { } @Override - public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane) { + public void outputWindowedValue( + KV<K, Iterable<V>> output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { if (traceTuples) { LOG.debug("\nemitting {} timestamp {}\n", output, timestamp); } - ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of( - WindowedValue.of(output, timestamp, windows, pane))); + ApexGroupByKeyOperator.this.output.emit( + ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane))); + } + + @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs"); } @Override @@ -379,8 +392,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { } @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, - Coder<T> elemCoder) throws IOException { + public <T> void writePCollectionViewData( + TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) + throws IOException { throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index bcc52d3..8b10813 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -88,7 +88,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn< stateInternals, timerInternals, WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()), - WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()), + WindowingInternalsAdapters.sideInputReader(c.windowingInternals()), droppedDueToClosedWindow, reduceFn, c.getPipelineOptions()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index 45c0eda..f8f6207 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -77,7 +77,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends stateInternals, timerInternals, WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()), - WindowingInternalsAdapters.sideInputAccess(c.windowingInternals()), + WindowingInternalsAdapters.sideInputReader(c.windowingInternals()), droppedDueToClosedWindow, reduceFn, c.getPipelineOptions()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index d43fb8e..539126a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -28,7 +28,9 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ActiveWindowSet; +import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; @@ -62,7 +64,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { private final StateInternals<K> stateInternals; private final ActiveWindowSet<W> activeWindows; private final TimerInternals timerInternals; - private final SideInputAccess sideInputAccess; + private final SideInputReader sideInputReader; private final PipelineOptions options; ReduceFnContextFactory( @@ -72,7 +74,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { StateInternals<K> stateInternals, ActiveWindowSet<W> activeWindows, TimerInternals timerInternals, - SideInputAccess sideInputAccess, + SideInputReader sideInputReader, PipelineOptions options) { this.key = key; this.reduceFn = reduceFn; @@ -80,7 +82,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { this.stateInternals = stateInternals; this.activeWindows = activeWindows; this.timerInternals = timerInternals; - this.sideInputAccess = sideInputAccess; + this.sideInputReader = sideInputReader; this.options = options; } @@ -94,8 +96,14 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) { return new StateAccessorImpl<K, W>( - activeWindows, windowingStrategy.getWindowFn().windowCoder(), - stateInternals, stateContextFromComponents(options, sideInputAccess, window), + activeWindows, + windowingStrategy.getWindowFn().windowCoder(), + stateInternals, + stateContextFromComponents( + options, + sideInputReader, + window, + windowingStrategy.getWindowFn()), style); } @@ -504,8 +512,9 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { private static <W extends BoundedWindow> StateContext<W> stateContextFromComponents( @Nullable final PipelineOptions options, - final SideInputAccess sideInputAccess, - final W window) { + final SideInputReader sideInputReader, + final W mainInputWindow, + final WindowFn<?, W> windowFn) { if (options == null) { return StateContexts.nullContext(); } else { @@ -518,12 +527,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { @Override public <T> T sideInput(PCollectionView<T> view) { - return sideInputAccess.sideInput(view, window); + return sideInputReader.get(view, windowFn.getSideInputWindow(mainInputWindow)); } @Override public W window() { - return window; + return mainInputWindow; } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 023a77a..a686f46 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ActiveWindowSet; import org.apache.beam.sdk.util.MergingActiveWindowSet; import org.apache.beam.sdk.util.NonMergingActiveWindowSet; +import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; @@ -217,7 +218,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme StateInternals<K> stateInternals, TimerInternals timerInternals, OutputWindowedValue<KV<K, OutputT>> outputter, - SideInputAccess sideInputAccess, + SideInputReader sideInputReader, Aggregator<Long, Long> droppedDueToClosedWindow, ReduceFn<K, InputT, OutputT, W> reduceFn, PipelineOptions options) { @@ -241,7 +242,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme this.contextFactory = new ReduceFnContextFactory<>(key, reduceFn, this.windowingStrategy, - stateInternals, this.activeWindows, timerInternals, sideInputAccess, options); + stateInternals, this.activeWindows, timerInternals, sideInputReader, options); this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy); this.triggerRunner = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java deleted file mode 100644 index 7d64566..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputAccess.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * Allows accessing the side inputs for a particular main input window. - */ -public interface SideInputAccess { - /** - * Return the value of the side input for the window of a main input element. - */ - <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index c0f3a02..76aae8f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -142,7 +142,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } private void invokeProcessElement(WindowedValue<InputT> elem) { - DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem); + final DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem); // This can contain user code. Wrap it in case it throws an exception. try { @@ -283,12 +283,10 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out return WindowedValue.of(output, timestamp, windows, pane); } - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { if (!sideInputReader.contains(view)) { throw new IllegalArgumentException("calling sideInput() with unknown view"); } - BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); return sideInputReader.get(view, sideInputWindow); } @@ -432,7 +430,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out final DoFnContext<InputT, OutputT> context; final WindowedValue<InputT> windowedValue; - public DoFnProcessContext( + private DoFnProcessContext( DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context, WindowedValue<InputT> windowedValue) { @@ -473,7 +471,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out "sideInput called when main input element is in multiple windows"); } } - return context.sideInput(view, window); + return context.sideInput( + view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window)); } @Override @@ -493,14 +492,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); } - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - context.outputWindowedValue(output, timestamp, windows, pane); - } - @Override public <T> void sideOutput(TupleTag<T> tag, T output) { checkNotNull(tag, "Tag passed to sideOutput cannot be null"); @@ -628,7 +619,9 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, - PaneInfo pane) {} + PaneInfo pane) { + throw new UnsupportedOperationException("A DoFn cannot output to a different window"); + } @Override public <SideOutputT> void sideOutputWindowedValue( @@ -636,11 +629,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out SideOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, - PaneInfo pane) {} + PaneInfo pane) { + throw new UnsupportedOperationException( + "A DoFn cannot side output to a different window"); + } @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - return context.sideInput(view, mainInputWindow); + public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { + return context.sideInput(view, sideInputWindow); } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 8efc27b..cbda791 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -251,12 +251,10 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT return WindowedValue.of(output, timestamp, windows, pane); } - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { if (!sideInputReader.contains(view)) { throw new IllegalArgumentException("calling sideInput() with unknown view"); } - BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); return sideInputReader.get(view, sideInputWindow); } @@ -390,7 +388,8 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT "sideInput called when main input element is in multiple windows"); } } - return context.sideInput(view, window); + return context.sideInput( + view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window)); } @Override @@ -515,8 +514,8 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT } @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - return context.sideInput(view, mainInputWindow); + public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { + return context.sideInput(view, sideInputWindow); } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java index 1b47e2b..7f80844 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java @@ -20,21 +20,32 @@ package org.apache.beam.runners.core; import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; /** - * Adapters from {@link WindowingInternals} to {@link SideInputAccess} and {@link + * Adapters from {@link WindowingInternals} to {@link SideInputReader} and {@link * OutputWindowedValue}. */ public class WindowingInternalsAdapters { - static SideInputAccess sideInputAccess(final WindowingInternals<?, ?> windowingInternals) { - return new SideInputAccess() { + static SideInputReader sideInputReader(final WindowingInternals<?, ?> windowingInternals) { + return new SideInputReader() { @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - return windowingInternals.sideInput(view, mainInputWindow); + public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) { + return windowingInternals.sideInput(view, sideInputWindow); + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmpty() { + throw new UnsupportedOperationException(); } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 5f8424e..337be23 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -80,7 +80,6 @@ import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; import org.apache.beam.sdk.util.state.TimerCallback; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; @@ -105,7 +104,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { private final WindowFn<Object, W> windowFn; private final TestOutputWindowedValue testOutputter; - private final TestSideInputAccess testSideInputAccess; + private final SideInputReader sideInputReader; private final Coder<OutputT> outputCoder; private final WindowingStrategy<Object, W> objectStrategy; private final ExecutableTriggerStateMachine executableTriggerStateMachine; @@ -291,7 +290,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { this.reduceFn = reduceFn; this.windowFn = objectStrategy.getWindowFn(); this.testOutputter = new TestOutputWindowedValue(); - this.testSideInputAccess = new TestSideInputAccess(sideInputReader); + this.sideInputReader = sideInputReader; this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine); this.outputCoder = outputCoder; this.options = options; @@ -314,7 +313,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { stateInternals, timerInternals, testOutputter, - testSideInputAccess, + sideInputReader, droppedDueToClosedWindow, reduceFn, options); @@ -522,8 +521,11 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { private List<WindowedValue<KV<String, OutputT>>> outputs = new ArrayList<>(); @Override - public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane) { + public void outputWindowedValue( + KV<String, OutputT> output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { // Copy the output value (using coders) before capturing it. KV<String, OutputT> copy = SerializableUtils.<KV<String, OutputT>>ensureSerializableByCoder( KvCoder.of(StringUtf8Coder.of(), outputCoder), output, "outputForWindow"); @@ -538,25 +540,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - throw new UnsupportedOperationException(); - } - } - - private class TestSideInputAccess implements SideInputAccess { - private SideInputReader sideInputReader; - - private TestSideInputAccess(SideInputReader sideInputReader) { - this.sideInputReader = sideInputReader; - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - if (!sideInputReader.contains(view)) { - throw new IllegalArgumentException("calling sideInput() with unknown view"); - } - BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); - return sideInputReader.get(view, sideInputWindow); + throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs"); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 0e8adba..a5bb214 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -22,7 +22,6 @@ import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; @@ -31,7 +30,6 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindo import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; -import org.apache.beam.runners.core.SideInputAccess; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; @@ -47,14 +45,13 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -176,12 +173,22 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { stateInternals, timerInternals, new OutputWindowedValueToBundle<>(bundle), - new SideInputAccess() { + new SideInputReader() { @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) { throw new UnsupportedOperationException( "GroupAlsoByWindow must not have side inputs"); } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmpty() { + throw new UnsupportedOperationException(); + } }, droppedDueToClosedWindow, reduceFn, @@ -276,7 +283,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - throw new UnsupportedOperationException("Can't output to side outputs from a ReduceFn"); + throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs"); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index dc0ef0f..db045f5 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -64,13 +64,14 @@ public class FlinkDoFnFunction<InputT, OutputT> Iterable<WindowedValue<InputT>> values, Collector<WindowedValue<OutputT>> out) throws Exception { - FlinkSingleOutputProcessContext<InputT, OutputT> context = new FlinkSingleOutputProcessContext<>( - serializedOptions.getPipelineOptions(), - getRuntimeContext(), - doFn, - windowingStrategy, - sideInputs, out - ); + FlinkSingleOutputProcessContext<InputT, OutputT> context = + new FlinkSingleOutputProcessContext<>( + serializedOptions.getPipelineOptions(), + getRuntimeContext(), + doFn, + windowingStrategy, + sideInputs, + out); this.doFn.startBundle(context); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index b814015..2169785 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -43,7 +43,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.util.Collector; import org.joda.time.Instant; /** @@ -162,19 +161,13 @@ abstract class FlinkProcessContextBase<InputT, OutputT> @Override public <ViewT> ViewT sideInput( PCollectionView<ViewT> view, - BoundedWindow mainInputWindow) { + BoundedWindow sideInputWindow) { checkNotNull(view, "View passed to sideInput cannot be null"); checkNotNull( sideInputs.get(view), "Side input for " + view + " not available."); - // get the side input strategy for mapping the window - WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view); - - BoundedWindow sideInputWindow = - windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow); - Map<BoundedWindow, ViewT> sideInputs = runtimeContext.getBroadcastVariableWithInitializer( view.getTagInternal().getId(), new SideInputInitializer<>(view)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java index d67f6fd..529b1cc 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.Map; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index fa08c5b..f4be121 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.FlatMapFunction; -import org.joda.time.Instant; /** @@ -88,12 +87,12 @@ public class DoFnFunction<InputT, OutputT> } @Override - public synchronized void output(WindowedValue<OutputT> o) { + protected synchronized void outputWindowedValue(WindowedValue<OutputT> o) { outputs.add(o); } @Override - public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) { + protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output) { throw new UnsupportedOperationException( "sideOutput is an unsupported operation for doFunctions, use a " + "MultiDoFunction instead."); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index d015b08..8175beb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -34,8 +34,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.joda.time.Instant; - import scala.Tuple2; /** @@ -98,12 +96,12 @@ public class MultiDoFnFunction<InputT, OutputT> } @Override - public synchronized void output(WindowedValue<OutputT> o) { + protected synchronized void outputWindowedValue(WindowedValue<OutputT> o) { outputs.put(mMainOutputTag, o); } @Override - public <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output) { + protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output) { outputs.put(tag, output); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index afbc824..6a6cbd4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -150,9 +150,9 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> public void outputWithTimestamp(OutputT output, Instant timestamp) { if (windowedValue == null) { // this is start/finishBundle. - output(noElementWindowedValue(output, timestamp, windowFn)); + outputWindowedValue(noElementWindowedValue(output, timestamp, windowFn)); } else { - output(WindowedValue.of(output, timestamp, windowedValue.getWindows(), + outputWindowedValue(WindowedValue.of(output, timestamp, windowedValue.getWindows(), windowedValue.getPane())); } } @@ -167,15 +167,16 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { if (windowedValue == null) { // this is start/finishBundle. - sideOutput(tag, noElementWindowedValue(output, timestamp, windowFn)); + sideOutputWindowedValue(tag, noElementWindowedValue(output, timestamp, windowFn)); } else { - sideOutput(tag, WindowedValue.of(output, timestamp, windowedValue.getWindows(), + sideOutputWindowedValue(tag, WindowedValue.of(output, timestamp, windowedValue.getWindows(), windowedValue.getPane())); } } - public abstract void output(WindowedValue<OutputT> output); - public abstract <T> void sideOutput(TupleTag<T> tag, WindowedValue<T> output); + protected abstract void outputWindowedValue(WindowedValue<OutputT> output); + + protected abstract <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output); static <T, W extends BoundedWindow> WindowedValue<T> noElementWindowedValue( final T output, final Instant timestamp, WindowFn<Object, W> windowFn) { @@ -241,16 +242,24 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> } @Override - public void outputWindowedValue(OutputT output, Instant timestamp, Collection<? - extends BoundedWindow> windows, PaneInfo paneInfo) { - output(WindowedValue.of(output, timestamp, windows, paneInfo)); + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + SparkProcessContext.this.outputWindowedValue( + WindowedValue.of(output, timestamp, windows, paneInfo)); } @Override public <SideOutputT> void sideOutputWindowedValue( - TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) { - sideOutput(tag, WindowedValue.of(output, timestamp, windows, paneInfo)); + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + SparkProcessContext.this.sideOutputWindowedValue( + tag, WindowedValue.of(output, timestamp, windows, paneInfo)); } @Override @@ -273,14 +282,14 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> } @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, - Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + public <T> void writePCollectionViewData( + TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { throw new UnsupportedOperationException( "WindowingInternals#writePCollectionViewData() is not yet supported."); } @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { throw new UnsupportedOperationException( "WindowingInternals#sideInput() is not yet supported."); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index dd7d894..bbf0315 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -710,7 +710,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Override public <T> T sideInput( - PCollectionView<T> view, BoundedWindow mainInputWindow) { + PCollectionView<T> view, BoundedWindow sideInputWindow) { throw new UnsupportedOperationException( "SideInput from WindowingInternals is not supported in in the context of DoFnTester"); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90a0d0e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java index ab3c600..5e90864 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java @@ -86,7 +86,7 @@ public interface WindowingInternals<InputT, OutputT> { Coder<T> elemCoder) throws IOException; /** - * Return the value of the side input for the window of a main input element. + * Return the value of the side input for a particular side input window. */ - <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow); + <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow); }