http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java index eac465c..2967f2c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java @@ -26,9 +26,8 @@ import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -54,9 +53,8 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { @SuppressWarnings("unchecked") - TimestampCombiner timestampCombiner = - (TimestampCombiner) windowingStrategy.getTimestampCombiner(); - WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); // get all elements so that we can sort them, has to fit into // memory @@ -90,19 +88,18 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou // create accumulator using the first elements key WindowedValue<KV<K, InputT>> currentValue = iterator.next(); K key = currentValue.getValue().getKey(); - W currentWindow = (W) Iterables.getOnlyElement(currentValue.getWindows()); + BoundedWindow currentWindow = Iterables.getOnlyElement(currentValue.getWindows()); InputT firstValue = currentValue.getValue().getValue(); AccumT accumulator = flinkCombiner.firstInput( key, firstValue, options, sideInputReader, currentValue.getWindows()); - // we use this to keep track of the timestamps assigned by the TimestampCombiner + // we use this to keep track of the timestamps assigned by the OutputTimeFn Instant windowTimestamp = - timestampCombiner.assign( - currentWindow, windowFn.getOutputTime(currentValue.getTimestamp(), currentWindow)); + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); while (iterator.hasNext()) { WindowedValue<KV<K, InputT>> nextValue = iterator.next(); - W nextWindow = (W) Iterables.getOnlyElement(nextValue.getWindows()); + BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); if (currentWindow.equals(nextWindow)) { // continue accumulating and merge windows @@ -111,12 +108,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou accumulator = flinkCombiner.addInput(key, accumulator, value, options, sideInputReader, currentValue.getWindows()); - windowTimestamp = - timestampCombiner.combine( - windowTimestamp, - timestampCombiner.assign( - currentWindow, - windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow))); + windowTimestamp = outputTimeFn.combine( + windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); } else { // emit the value that we currently have @@ -133,9 +127,7 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou InputT value = nextValue.getValue().getValue(); accumulator = flinkCombiner.firstInput(key, value, options, sideInputReader, currentValue.getWindows()); - windowTimestamp = - timestampCombiner.assign( - currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); } }
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java index d015c38..3203446 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineContextFactory; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -176,9 +176,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { throw new UnsupportedOperationException( String.format("%s is not supported", WatermarkHoldState.class.getSimpleName())); } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java index 2dd7c96..24b340e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java @@ -38,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -186,9 +186,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { throw new UnsupportedOperationException( String.format("%s is not supported", CombiningState.class.getSimpleName())); } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java index 17ea62a..2bf0bf1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; @@ -146,9 +146,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { throw new UnsupportedOperationException( String.format("%s is not supported", CombiningState.class.getSimpleName())); } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index 878c914..4f961e5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.CombineContextFactory; import org.apache.beam.sdk.util.state.BagState; @@ -185,12 +185,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { return new FlinkWatermarkHoldState<>( - flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner); + flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn); } }); } @@ -912,9 +912,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } private static class FlinkWatermarkHoldState<K, W extends BoundedWindow> - implements WatermarkHoldState { - private final StateTag<? super K, WatermarkHoldState> address; - private final TimestampCombiner timestampCombiner; + implements WatermarkHoldState<W> { + private final StateTag<? super K, WatermarkHoldState<W>> address; + private final OutputTimeFn<? super W> outputTimeFn; private final StateNamespace namespace; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; private final FlinkStateInternals<K> flinkStateInternals; @@ -923,11 +923,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> { public FlinkWatermarkHoldState( KeyedStateBackend<ByteBuffer> flinkStateBackend, FlinkStateInternals<K> flinkStateInternals, - StateTag<? super K, WatermarkHoldState> address, + StateTag<? super K, WatermarkHoldState<W>> address, StateNamespace namespace, - TimestampCombiner timestampCombiner) { + OutputTimeFn<? super W> outputTimeFn) { this.address = address; - this.timestampCombiner = timestampCombiner; + this.outputTimeFn = outputTimeFn; this.namespace = namespace; this.flinkStateBackend = flinkStateBackend; this.flinkStateInternals = flinkStateInternals; @@ -937,12 +937,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } @Override - public TimestampCombiner getTimestampCombiner() { - return timestampCombiner; + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; } @Override - public WatermarkHoldState readLater() { + public WatermarkHoldState<W> readLater() { return this; } @@ -983,7 +983,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { state.update(value); flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value); } else { - Instant combined = timestampCombiner.combine(current, value); + Instant combined = outputTimeFn.combine(current, value); state.update(combined); flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined); } @@ -1035,7 +1035,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { if (!address.equals(that.address)) { return false; } - if (!timestampCombiner.equals(that.timestampCombiner)) { + if (!outputTimeFn.equals(that.outputTimeFn)) { return false; } return namespace.equals(that.namespace); @@ -1045,7 +1045,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { @Override public int hashCode() { int result = address.hashCode(); - result = 31 * result + timestampCombiner.hashCode(); + result = 31 * result + outputTimeFn.hashCode(); result = 31 * result + namespace.hashCode(); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 17c43bf..d140271 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -77,12 +77,14 @@ public class FlinkStateInternalsTest { "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); FlinkStateInternals<String> underTest; @@ -272,7 +274,7 @@ public class FlinkStateInternalsTest { @Test public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState value = + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); // State instances are cached, but depend on the namespace. @@ -296,7 +298,7 @@ public class FlinkStateInternalsTest { @Test public void testWatermarkLatestState() throws Exception { - WatermarkHoldState value = + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); // State instances are cached, but depend on the namespace. @@ -320,7 +322,7 @@ public class FlinkStateInternalsTest { @Test public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); @@ -337,7 +339,7 @@ public class FlinkStateInternalsTest { @Test public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState value = + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); @@ -351,9 +353,9 @@ public class FlinkStateInternalsTest { @Test public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = + WatermarkHoldState<BoundedWindow> value1 = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState value2 = + WatermarkHoldState<BoundedWindow> value2 = underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); value1.add(new Instant(3000)); @@ -370,11 +372,11 @@ public class FlinkStateInternalsTest { @Test public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = + WatermarkHoldState<BoundedWindow> value1 = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState value2 = + WatermarkHoldState<BoundedWindow> value2 = underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState value3 = + WatermarkHoldState<BoundedWindow> value3 = underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); value1.add(new Instant(3000)); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java index c967521..725e9d3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java @@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -166,10 +166,10 @@ class SparkStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { - return new SparkWatermarkHoldState(namespace, address, timestampCombiner); + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { + return new SparkWatermarkHoldState<>(namespace, address, outputTimeFn); } } @@ -250,21 +250,21 @@ class SparkStateInternals<K> implements StateInternals<K> { } } - private class SparkWatermarkHoldState extends AbstractState<Instant> - implements WatermarkHoldState { + private class SparkWatermarkHoldState<W extends BoundedWindow> + extends AbstractState<Instant> implements WatermarkHoldState<W> { - private final TimestampCombiner timestampCombiner; + private final OutputTimeFn<? super W> outputTimeFn; public SparkWatermarkHoldState( StateNamespace namespace, - StateTag<?, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { + StateTag<?, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { super(namespace, address, InstantCoder.of()); - this.timestampCombiner = timestampCombiner; + this.outputTimeFn = outputTimeFn; } @Override - public SparkWatermarkHoldState readLater() { + public SparkWatermarkHoldState<W> readLater() { return this; } @@ -276,10 +276,7 @@ class SparkStateInternals<K> implements StateInternals<K> { @Override public void add(Instant outputTime) { Instant combined = read(); - combined = - (combined == null) - ? outputTime - : getTimestampCombiner().combine(combined, outputTime); + combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime); writeValue(combined); } @@ -298,8 +295,8 @@ class SparkStateInternals<K> implements StateInternals<K> { } @Override - public TimestampCombiner getTimestampCombiner() { - return timestampCombiner; + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; } } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java index 7d06d6b..fa1c3fc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java @@ -50,7 +50,7 @@ import org.apache.beam.sdk.values.TupleTag; public class SparkAbstractCombineFn implements Serializable { protected final SparkRuntimeContext runtimeContext; protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; - protected final WindowingStrategy<?, BoundedWindow> windowingStrategy; + protected final WindowingStrategy<?, ?> windowingStrategy; public SparkAbstractCombineFn( @@ -59,7 +59,7 @@ public class SparkAbstractCombineFn implements Serializable { WindowingStrategy<?, ?> windowingStrategy) { this.runtimeContext = runtimeContext; this.sideInputs = sideInputs; - this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) windowingStrategy; + this.windowingStrategy = windowingStrategy; } // each Spark task should get it's own copy of this SparkKeyedCombineFn, and since Spark tasks http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java index 7d026c6..23f5d20 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java @@ -29,9 +29,8 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -71,8 +70,9 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract // sort exploded inputs. Iterable<WindowedValue<InputT>> sortedInputs = sortByWindows(input.explodeWindows()); - TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); - WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn(); + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); //--- inputs iterator, by window order. final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator(); @@ -84,13 +84,9 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract accumulator = combineFn.addInput(accumulator, currentInput.getValue(), ctxtForInput(currentInput)); - // keep track of the timestamps assigned by the TimestampCombiner. + // keep track of the timestamps assigned by the OutputTimeFn. Instant windowTimestamp = - timestampCombiner.assign( - currentWindow, - windowingStrategy - .getWindowFn() - .getOutputTime(currentInput.getTimestamp(), currentWindow)); + outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow); // accumulate the next windows, or output. List<WindowedValue<AccumT>> output = Lists.newArrayList(); @@ -113,13 +109,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract // keep accumulating and carry on ;-) accumulator = combineFn.addInput(accumulator, nextValue.getValue(), ctxtForInput(nextValue)); - windowTimestamp = - timestampCombiner.merge( - currentWindow, - windowTimestamp, - windowingStrategy - .getWindowFn() - .getOutputTime(nextValue.getTimestamp(), currentWindow)); + windowTimestamp = outputTimeFn.combine(windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); } else { // moving to the next window, first add the current accumulation to output // and initialize the accumulator. @@ -130,8 +121,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract accumulator = combineFn.addInput(accumulator, nextValue.getValue(), ctxtForInput(nextValue)); currentWindow = nextWindow; - windowTimestamp = timestampCombiner.assign(currentWindow, - windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); } } @@ -172,7 +162,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract Iterable<WindowedValue<AccumT>> sortedAccumulators = sortByWindows(accumulators); @SuppressWarnings("unchecked") - TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); //--- accumulators iterator, by window order. final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator(); @@ -183,7 +174,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract List<AccumT> currentWindowAccumulators = Lists.newArrayList(); currentWindowAccumulators.add(currentValue.getValue()); - // keep track of the timestamps assigned by the TimestampCombiner, + // keep track of the timestamps assigned by the OutputTimeFn, // in createCombiner we already merge the timestamps assigned // to individual elements, here we will just merge them. List<Instant> windowTimestamps = Lists.newArrayList(); @@ -215,7 +206,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract // add the current accumulation to the output and initialize the accumulation. // merge the timestamps of all accumulators to merge. - Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps); + Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps); // merge accumulators. // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>. @@ -240,7 +231,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract } // merge the last chunk of accumulators. - Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps); + Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps); Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators); WindowedValue<Iterable<AccumT>> preMergeWindowedValue = WindowedValue.of( accumsToMerge, mergedTimestamp, currentWindow, PaneInfo.NO_FIRING); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java index 66c03bc..b5d243f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java @@ -29,9 +29,8 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -73,8 +72,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra // sort exploded inputs. Iterable<WindowedValue<KV<K, InputT>>> sortedInputs = sortByWindows(wkvi.explodeWindows()); - TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); - WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn(); + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); //--- inputs iterator, by window order. final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator(); @@ -87,13 +87,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra accumulator = combineFn.addInput(key, accumulator, currentInput.getValue().getValue(), ctxtForInput(currentInput)); - // keep track of the timestamps assigned by the TimestampCombiner. + // keep track of the timestamps assigned by the OutputTimeFn. Instant windowTimestamp = - timestampCombiner.assign( - currentWindow, - windowingStrategy - .getWindowFn() - .getOutputTime(currentInput.getTimestamp(), currentWindow)); + outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow); // accumulate the next windows, or output. List<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList(); @@ -116,12 +112,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra // keep accumulating and carry on ;-) accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(), ctxtForInput(nextValue)); - windowTimestamp = - timestampCombiner.combine( - windowTimestamp, - timestampCombiner.assign( - currentWindow, - windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow))); + windowTimestamp = outputTimeFn.combine(windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); } else { // moving to the next window, first add the current accumulation to output // and initialize the accumulator. @@ -132,9 +124,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(), ctxtForInput(nextValue)); currentWindow = nextWindow; - windowTimestamp = - timestampCombiner.assign( - currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); } } @@ -180,7 +170,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra Iterable<WindowedValue<KV<K, AccumT>>> sortedAccumulators = sortByWindows(accumulators); @SuppressWarnings("unchecked") - TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); //--- accumulators iterator, by window order. final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator(); @@ -192,7 +183,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra List<AccumT> currentWindowAccumulators = Lists.newArrayList(); currentWindowAccumulators.add(currentValue.getValue().getValue()); - // keep track of the timestamps assigned by the TimestampCombiner, + // keep track of the timestamps assigned by the OutputTimeFn, // in createCombiner we already merge the timestamps assigned // to individual elements, here we will just merge them. List<Instant> windowTimestamps = Lists.newArrayList(); @@ -224,7 +215,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra // add the current accumulation to the output and initialize the accumulation. // merge the timestamps of all accumulators to merge. - Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps); + Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps); // merge accumulators. // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>. @@ -250,7 +241,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra } // merge the last chunk of accumulators. - Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps); + Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps); Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators); WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of( KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 58b5a84..6c46453 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -135,6 +135,11 @@ <dependencies> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-common-runner-api</artifactId> + </dependency> + + <dependency> <groupId>com.google.http-client</groupId> <artifactId>google-http-client</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java index e8c2f8d..63e7903 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java @@ -17,14 +17,11 @@ */ package org.apache.beam.sdk.testing; -import static com.google.common.base.Preconditions.checkArgument; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -38,7 +35,8 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Instant; import org.joda.time.ReadableInstant; @@ -254,19 +252,20 @@ public class WindowFnTestUtils { /** * Verifies that later-ending merged windows from any of the timestamps hold up output of - * earlier-ending windows, using the provided {@link WindowFn} and {@link TimestampCombiner}. + * earlier-ending windows, using the provided {@link WindowFn} and {@link OutputTimeFn}. * * <p>Given a list of lists of timestamps, where each list is expected to merge into a single * window with end times in ascending order, assigns and merges windows for each list (as though - * each were a separate key/user session). Then combines each timestamp in the list according to - * the provided {@link TimestampCombiner}. + * each were a separate key/user session). Then maps each timestamp in the list according to + * {@link OutputTimeFn#assignOutputTime outputTimeFn.assignOutputTime()} and + * {@link OutputTimeFn#combine outputTimeFn.combine()}. * * <p>Verifies that a overlapping windows do not hold each other up via the watermark. */ public static <T, W extends IntervalWindow> void validateGetOutputTimestamps( WindowFn<T, W> windowFn, - TimestampCombiner timestampCombiner, + OutputTimeFn<? super W> outputTimeFn, List<List<Long>> timestampsPerWindow) throws Exception { // Assign windows to each timestamp, then merge them, storing the merged windows in @@ -301,11 +300,10 @@ public class WindowFnTestUtils { List<Instant> outputInstants = new ArrayList<>(); for (long inputTimestamp : timestampsForWindow) { - outputInstants.add( - assignOutputTime(timestampCombiner, new Instant(inputTimestamp), window)); + outputInstants.add(outputTimeFn.assignOutputTime(new Instant(inputTimestamp), window)); } - combinedOutputTimestamps.add(combineOutputTimes(timestampCombiner, outputInstants)); + combinedOutputTimestamps.add(OutputTimeFns.combineOutputTimes(outputTimeFn, outputInstants)); } // Consider windows in increasing order of max timestamp; ensure the output timestamp is after @@ -323,37 +321,4 @@ public class WindowFnTestUtils { earlierEndingWindow = window; } } - - private static Instant assignOutputTime( - TimestampCombiner timestampCombiner, Instant inputTimestamp, BoundedWindow window) { - switch (timestampCombiner) { - case EARLIEST: - case LATEST: - return inputTimestamp; - case END_OF_WINDOW: - return window.maxTimestamp(); - default: - throw new IllegalArgumentException( - String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner)); - } - } - - private static Instant combineOutputTimes( - TimestampCombiner timestampCombiner, Iterable<Instant> outputInstants) { - checkArgument( - !Iterables.isEmpty(outputInstants), - "Cannot combine zero instants with %s", - timestampCombiner); - switch(timestampCombiner) { - case EARLIEST: - return Ordering.natural().min(outputInstants); - case LATEST: - return Ordering.natural().max(outputInstants); - case END_OF_WINDOW: - return outputInstants.iterator().next(); - default: - throw new IllegalArgumentException( - String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner)); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index d9c4c9f..cc92102 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowingStrategy; @@ -98,7 +97,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * for details on the estimation. * * <p>The timestamp for each emitted pane is determined by the - * {@link Window#withTimestampCombiner(TimestampCombiner)} windowing operation}. + * {@link Window#withOutputTimeFn windowing operation}. * The output {@code PCollection} will have the same {@link WindowFn} * as the input. * http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java new file mode 100644 index 0000000..0efd278 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.collect.Ordering; +import java.io.Serializable; +import java.util.Objects; +import org.apache.beam.sdk.annotations.Experimental; +import org.joda.time.Instant; + +/** + * <b><i>(Experimental)</i></b> A function from timestamps of input values to the timestamp for a + * computed value. + * + * <p>The function is represented via three components: + * <ol> + * <li>{@link #assignOutputTime} calculates an output timestamp for any input + * value in a particular window.</li> + * <li>The output timestamps for all non-late input values within a window are combined + * according to {@link #combine combine()}, a commutative and associative operation on + * the output timestamps.</li> + * <li>The output timestamp when windows merge is provided by {@link #merge merge()}.</li> + * </ol> + * + * <p>This abstract class cannot be subclassed directly, by design: it may grow + * in consumer-compatible ways that require mutually-exclusive default implementations. To + * create a concrete subclass, extend {@link OutputTimeFn.Defaults} or + * {@link OutputTimeFn.DependsOnlyOnWindow}. Note that as long as this class remains + * experimental, we may also choose to change it in arbitrary backwards-incompatible ways. + * + * @param <W> the type of window. Contravariant: methods accepting any subtype of + * {@code OutputTimeFn<W>} should use the parameter type {@code OutputTimeFn<? super W>}. + */ +@Experimental(Experimental.Kind.OUTPUT_TIME) +public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializable { + + protected OutputTimeFn() { } + + /** + * Returns the output timestamp to use for data depending on the given + * {@code inputTimestamp} in the specified {@code window}. + * + * <p>The result of this method must be between {@code inputTimestamp} and + * {@code window.maxTimestamp()} (inclusive on both sides). + * + * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B}, + * then {@code assignOutputTime(A, window) <= assignOutputTime(B, window)}. + * + * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically + * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is + * suggested that the result in later overlapping windows is past the end of earlier windows + * so that the later windows don't prevent the watermark from + * progressing past the end of the earlier window. + * + * <p>See the overview of {@link OutputTimeFn} for the consistency properties required + * between {@link #assignOutputTime}, {@link #combine}, and {@link #merge}. + */ + public abstract Instant assignOutputTime(Instant inputTimestamp, W window); + + /** + * Combines the given output times, which must be from the same window, into an output time + * for a computed value. + * + * <ul> + * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.</li> + * <li>{@code combine} must be associative: + * {@code combine(a, combine(b, c)).equals(combine(combine(a, b), c))}.</li> + * </ul> + */ + public abstract Instant combine(Instant outputTime, Instant otherOutputTime); + + /** + * Merges the given output times, presumed to be combined output times for windows that + * are merging, into an output time for the {@code resultWindow}. + * + * <p>When windows {@code w1} and {@code w2} merge to become a new window {@code w1plus2}, + * then {@link #merge} must be implemented such that the output time is the same as + * if all timestamps were assigned in {@code w1plus2}. Formally: + * + * <p>{@code fn.merge(w, fn.assignOutputTime(t1, w1), fn.assignOutputTime(t2, w2))} + * + * <p>must be equal to + * + * <p>{@code fn.combine(fn.assignOutputTime(t1, w1plus2), fn.assignOutputTime(t2, w1plus2))} + * + * <p>If the assigned time depends only on the window, the correct implementation of + * {@link #merge merge()} necessarily returns the result of + * {@link #assignOutputTime assignOutputTime(t1, w1plus2)} + * (which equals {@link #assignOutputTime assignOutputTime(t2, w1plus2)}. + * Defaults for this case are provided by {@link DependsOnlyOnWindow}. + * + * <p>For many other {@link OutputTimeFn} implementations, such as taking the earliest or latest + * timestamp, this will be the same as {@link #combine combine()}. Defaults for this + * case are provided by {@link Defaults}. + */ + public abstract Instant merge(W intoWindow, Iterable<? extends Instant> mergingTimestamps); + + /** + * Returns {@code true} if the result of combination of many output timestamps actually depends + * only on the earliest. + * + * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp + * to be combined. + */ + public abstract boolean dependsOnlyOnEarliestInputTimestamp(); + + /** + * Returns {@code true} if the result does not depend on what outputs were combined but only + * the window they are in. The canonical example is if all timestamps are sure to + * be the end of the window. + * + * <p>This may allow optimizations, since it is typically very efficient to retrieve the window + * and combining output timestamps is not necessary. + * + * <p>If the assigned output time for an implementation depends only on the window, consider + * extending {@link DependsOnlyOnWindow}, which returns {@code true} here and also provides + * a framework for easily implementing a correct {@link #merge}, {@link #combine} and + * {@link #assignOutputTime}. + */ + public abstract boolean dependsOnlyOnWindow(); + + /** + * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} where the + * output time depends on the input element timestamps and possibly the window. + * + * <p>To complete an implementation, override {@link #assignOutputTime}, at a minimum. + * + * <p>By default, {@link #combine} and {@link #merge} return the earliest timestamp of their + * inputs. + */ + public abstract static class Defaults<W extends BoundedWindow> extends OutputTimeFn<W> { + + protected Defaults() { + super(); + } + + /** + * {@inheritDoc} + * + * @return the earlier of the two timestamps. + */ + @Override + public Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) { + return Ordering.natural().min(outputTimestamp, otherOutputTimestamp); + } + + /** + * {@inheritDoc} + * + * @return the result of {@link #combine combine(outputTimstamp, otherOutputTimestamp)}, + * by default. + */ + @Override + public Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) { + return OutputTimeFns.combineOutputTimes(this, mergingTimestamps); + } + + /** + * {@inheritDoc} + * + * @return {@code false} by default. An {@link OutputTimeFn} that is known to depend only on the + * window should extend {@link OutputTimeFn.DependsOnlyOnWindow}. + */ + @Override + public boolean dependsOnlyOnWindow() { + return false; + } + + /** + * {@inheritDoc} + * + * @return {@code true} by default. + */ + @Override + public boolean dependsOnlyOnEarliestInputTimestamp() { + return false; + } + + /** + * {@inheritDoc} + * + * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by + * default. + */ + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + + return this.getClass().equals(other.getClass()); + } + + @Override + public int hashCode() { + return Objects.hash(getClass()); + } + } + + /** + * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} when the + * output time depends only on the window. + * + * <p>To complete an implementation, override {@link #assignOutputTime(BoundedWindow)}. + */ + public abstract static class DependsOnlyOnWindow<W extends BoundedWindow> + extends OutputTimeFn<W> { + + protected DependsOnlyOnWindow() { + super(); + } + + /** + * Returns the output timestamp to use for data in the specified {@code window}. + * + * <p>Note that the result of this method must be between the maximum possible input timestamp + * in {@code window} and {@code window.maxTimestamp()} (inclusive on both sides). + * + * <p>For example, using {@code Sessions.withGapDuration(gapDuration)}, we know that all input + * timestamps must lie at least {@code gapDuration} from the end of the session, so + * {@code window.maxTimestamp() - gapDuration} is an acceptable assigned timestamp. + * + * @see #assignOutputTime(Instant, BoundedWindow) + */ + protected abstract Instant assignOutputTime(W window); + + /** + * {@inheritDoc} + * + * @return the result of {#link assignOutputTime(BoundedWindow) assignOutputTime(window)}. + */ + @Override + public final Instant assignOutputTime(Instant timestamp, W window) { + return assignOutputTime(window); + } + + /** + * {@inheritDoc} + * + * @return the same timestamp as both argument timestamps, which are necessarily equal. + */ + @Override + public final Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) { + return outputTimestamp; + } + + /** + * {@inheritDoc} + * + * @return the result of + * {@link #assignOutputTime(BoundedWindow) assignOutputTime(resultWindow)}. + */ + @Override + public final Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) { + return assignOutputTime(resultWindow); + } + + /** + * {@inheritDoc} + * + * @return {@code true}. + */ + @Override + public final boolean dependsOnlyOnWindow() { + return true; + } + + /** + * {@inheritDoc} + * + * @return {@code true}. Since the output time depends only on the window, it can + * certainly be ascertained given a single input timestamp. + */ + @Override + public final boolean dependsOnlyOnEarliestInputTimestamp() { + return true; + } + + /** + * {@inheritDoc} + * + * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by + * default. + */ + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + + return this.getClass().equals(other.getClass()); + } + + @Override + public int hashCode() { + return Objects.hash(getClass()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java new file mode 100644 index 0000000..b5d67fa --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.joda.time.Instant; + +/** + * <b><i>(Experimental)</i></b> Static utility methods and provided implementations for + * {@link OutputTimeFn}. + */ +@Experimental(Experimental.Kind.OUTPUT_TIME) +public class OutputTimeFns { + /** + * The policy of outputting at the earliest of the input timestamps for non-late input data + * that led to a computed value. + * + * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time + * elements being aggregated via some function {@code f} into + * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output + * timestamp of the result will be the earliest of the event time timestamps + * + * <p>If data arrives late, it has no effect on the output timestamp. + */ + public static OutputTimeFn<BoundedWindow> outputAtEarliestInputTimestamp() { + return new OutputAtEarliestInputTimestamp(); + } + + /** + * The policy of holding the watermark to the latest of the input timestamps + * for non-late input data that led to a computed value. + * + * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time + * elements being aggregated via some function {@code f} into + * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output + * timestamp of the result will be the latest of the event time timestamps + * + * <p>If data arrives late, it has no effect on the output timestamp. + */ + public static OutputTimeFn<BoundedWindow> outputAtLatestInputTimestamp() { + return new OutputAtLatestInputTimestamp(); + } + + /** + * The policy of outputting with timestamps at the end of the window. + * + * <p>Note that this output timestamp depends only on the window. See + * {#link dependsOnlyOnWindow()}. + * + * <p>When windows merge, instead of using {@link OutputTimeFn#combine} to obtain an output + * timestamp for the results in the new window, it is mandatory to obtain a new output + * timestamp from {@link OutputTimeFn#assignOutputTime} with the new window and an arbitrary + * timestamp (because it is guaranteed that the timestamp is irrelevant). + * + * <p>For non-merging window functions, this {@link OutputTimeFn} works transparently. + */ + public static OutputTimeFn<BoundedWindow> outputAtEndOfWindow() { + return new OutputAtEndOfWindow(); + } + + /** + * Applies the given {@link OutputTimeFn} to the given output times, obtaining + * the output time for a value computed. See {@link OutputTimeFn#combine} for + * a full specification. + * + * @throws IllegalArgumentException if {@code outputTimes} is empty. + */ + public static Instant combineOutputTimes( + OutputTimeFn<?> outputTimeFn, Iterable<? extends Instant> outputTimes) { + checkArgument( + !Iterables.isEmpty(outputTimes), + "Collection of output times must not be empty in %s.combineOutputTimes", + OutputTimeFns.class.getName()); + + @Nullable + Instant combinedOutputTime = null; + for (Instant outputTime : outputTimes) { + combinedOutputTime = + combinedOutputTime == null + ? outputTime : outputTimeFn.combine(combinedOutputTime, outputTime); + } + return combinedOutputTime; + } + + /** + * See {@link #outputAtEarliestInputTimestamp}. + */ + private static class OutputAtEarliestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> { + @Override + public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) { + return inputTimestamp; + } + + @Override + public Instant combine(Instant outputTime, Instant otherOutputTime) { + return Ordering.natural().min(outputTime, otherOutputTime); + } + + /** + * {@inheritDoc} + * + * @return {@code true}. The result of any combine will be the earliest input timestamp. + */ + @Override + public boolean dependsOnlyOnEarliestInputTimestamp() { + return true; + } + } + + /** + * See {@link #outputAtLatestInputTimestamp}. + */ + private static class OutputAtLatestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> { + @Override + public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) { + return inputTimestamp; + } + + @Override + public Instant combine(Instant outputTime, Instant otherOutputTime) { + return Ordering.natural().max(outputTime, otherOutputTime); + } + + /** + * {@inheritDoc} + * + * @return {@code false}. + */ + @Override + public boolean dependsOnlyOnEarliestInputTimestamp() { + return false; + } + } + + private static class OutputAtEndOfWindow extends OutputTimeFn.DependsOnlyOnWindow<BoundedWindow> { + + /** + *{@inheritDoc} + * + *@return {@code window.maxTimestamp()}. + */ + @Override + protected Instant assignOutputTime(BoundedWindow window) { + return window.maxTimestamp(); + } + + @Override + public String toString() { + return getClass().getCanonicalName(); + } + } + + public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) { + if (outputTimeFn instanceof OutputAtEarliestInputTimestamp) { + return RunnerApi.OutputTime.EARLIEST_IN_PANE; + } else if (outputTimeFn instanceof OutputAtLatestInputTimestamp) { + return RunnerApi.OutputTime.LATEST_IN_PANE; + } else if (outputTimeFn instanceof OutputAtEndOfWindow) { + return RunnerApi.OutputTime.END_OF_WINDOW; + } else { + throw new IllegalArgumentException( + String.format( + "Cannot convert %s to %s: %s", + OutputTimeFn.class.getCanonicalName(), + RunnerApi.OutputTime.class.getCanonicalName(), + outputTimeFn)); + } + } + + public static OutputTimeFn<?> fromProto(RunnerApi.OutputTime proto) { + switch (proto) { + case EARLIEST_IN_PANE: + return OutputTimeFns.outputAtEarliestInputTimestamp(); + case LATEST_IN_PANE: + return OutputTimeFns.outputAtLatestInputTimestamp(); + case END_OF_WINDOW: + return OutputTimeFns.outputAtEndOfWindow(); + case UNRECOGNIZED: + default: + // Whether or not it is proto that cannot recognize it (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this OutputTime means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.OutputTime.class.getCanonicalName(), + OutputTimeFn.class.getCanonicalName(), + proto)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java deleted file mode 100644 index 39fe8a9..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; -import java.util.Arrays; -import java.util.Collections; -import org.apache.beam.sdk.annotations.Experimental; -import org.joda.time.Instant; - -/** - * Policies for combining timestamps that occur within a window. - */ -@Experimental(Experimental.Kind.OUTPUT_TIME) -public enum TimestampCombiner { - /** - * The policy of taking at the earliest of a set of timestamps. - * - * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after - * they are shifted by the {@link WindowFn} (to allow downstream watermark progress). - * - * <p>If data arrives late, it has no effect on the output timestamp. - */ - EARLIEST { - @Override - public Instant combine(Iterable<? extends Instant> timestamps) { - return Ordering.natural().min(timestamps); - } - - @Override - public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) { - return combine(mergingTimestamps); - } - - @Override - public boolean dependsOnlyOnEarliestTimestamp() { - return true; - } - - @Override - public boolean dependsOnlyOnWindow() { - return false; - } - }, - - /** - * The policy of taking the latest of a set of timestamps. - * - * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after - * they are shifted by the {@link WindowFn} (to allow downstream watermark progress). - * - * <p>If data arrives late, it has no effect on the output timestamp. - */ - LATEST { - @Override - public Instant combine(Iterable<? extends Instant> timestamps) { - return Ordering.natural().max(timestamps); - } - - @Override - public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) { - return combine(mergingTimestamps); - } - - @Override - public boolean dependsOnlyOnEarliestTimestamp() { - return false; - } - - @Override - public boolean dependsOnlyOnWindow() { - return false; - } - }, - - /** - * The policy of using the end of the window, regardless of input timestamps. - * - * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after - * they are shifted by the {@link WindowFn} (to allow downstream watermark progress). - * - * <p>If data arrives late, it has no effect on the output timestamp. - */ - END_OF_WINDOW { - @Override - public Instant combine(Iterable<? extends Instant> timestamps) { - checkArgument(Iterables.size(timestamps) > 0); - return Iterables.get(timestamps, 0); - } - - @Override - public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) { - return intoWindow.maxTimestamp(); - } - - @Override - public boolean dependsOnlyOnEarliestTimestamp() { - return false; - } - - @Override - public boolean dependsOnlyOnWindow() { - return true; - } - }; - - /** - * Combines the given times, which must be from the same window and must have been passed through - * {@link #merge}. - * - * <ul> - * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}. - * <li>{@code combine} must be associative: {@code combine(a, combine(b, - * c)).equals(combine(combine(a, b), c))}. - * </ul> - */ - public abstract Instant combine(Iterable<? extends Instant> timestamps); - - /** - * Merges the given timestamps, which may have originated in separate windows, into the context of - * the result window. - */ - public abstract Instant merge( - BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps); - - /** - * Shorthand for {@link #merge} with just one element, to place it into the context of - * a window. - * - * <p>For example, the {@link #END_OF_WINDOW} policy moves the timestamp to the end of the window. - */ - public final Instant assign(BoundedWindow intoWindow, Instant timestamp) { - return merge(intoWindow, Collections.singleton(timestamp)); - } - - /** - * Varargs variant of {@link #combine}. - */ - public final Instant combine(Instant... timestamps) { - return combine(Arrays.asList(timestamps)); - } - - /** - * Varargs variant of {@link #merge}. - */ - public final Instant merge(BoundedWindow intoWindow, Instant... timestamps) { - return merge(intoWindow, Arrays.asList(timestamps)); - } - - /** - * Returns {@code true} if the result of combination of many output timestamps actually depends - * only on the earliest. - * - * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp - * to be combined. - */ - public abstract boolean dependsOnlyOnEarliestTimestamp(); - - /** - * Returns {@code true} if the result does not depend on what outputs were combined but only - * the window they are in. The canonical example is if all timestamps are sure to - * be the end of the window. - * - * <p>This may allow optimizations, since it is typically very efficient to retrieve the window - * and combining output timestamps is not necessary. - */ - public abstract boolean dependsOnlyOnWindow(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index cb7b430..1000ff7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -193,7 +193,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T @Nullable abstract AccumulationMode getAccumulationMode(); @Nullable abstract Duration getAllowedLateness(); @Nullable abstract ClosingBehavior getClosingBehavior(); - @Nullable abstract TimestampCombiner getTimestampCombiner(); + @Nullable abstract OutputTimeFn<?> getOutputTimeFn(); abstract Builder<T> toBuilder(); @@ -204,7 +204,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T abstract Builder<T> setAccumulationMode(AccumulationMode mode); abstract Builder<T> setAllowedLateness(Duration allowedLateness); abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior); - abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner); + abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn); abstract Window<T> build(); } @@ -273,12 +273,12 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T } /** - * <b><i>(Experimental)</i></b> Override the default {@link TimestampCombiner}, to control + * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control * the output timestamp of values output from a {@link GroupByKey} operation. */ @Experimental(Kind.OUTPUT_TIME) - public Window<T> withTimestampCombiner(TimestampCombiner timestampCombiner) { - return toBuilder().setTimestampCombiner(timestampCombiner).build(); + public Window<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) { + return toBuilder().setOutputTimeFn(outputTimeFn).build(); } /** @@ -300,6 +300,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T * Get the output strategy of this {@link Window Window PTransform}. For internal use * only. */ + // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is + // casting between wildcards public WindowingStrategy<?, ?> getOutputStrategyInternal( WindowingStrategy<?, ?> inputStrategy) { WindowingStrategy<?, ?> result = inputStrategy; @@ -318,8 +320,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T if (getClosingBehavior() != null) { result = result.withClosingBehavior(getClosingBehavior()); } - if (getTimestampCombiner() != null) { - result = result.withTimestampCombiner(getTimestampCombiner()); + if (getOutputTimeFn() != null) { + result = result.withOutputTimeFn(getOutputTimeFn()); } return result; } @@ -409,9 +411,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T .withLabel("Window Closing Behavior")); } - if (getTimestampCombiner() != null) { - builder.add(DisplayData.item("timestampCombiner", getTimestampCombiner().toString()) - .withLabel("Timestamp Combiner")); + if (getOutputTimeFn() != null) { + builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass()) + .withLabel("Output Time Function")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index 706e039..0c27c4f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -57,14 +57,13 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti // If the input has already had its windows merged, then the GBK that performed the merge // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged. - // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in - // time. + // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time. // Because this outputs as fast as possible, this should not hold the watermark. Window<KV<K, V>> rewindow = Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder())) .triggering(new ReshuffleTrigger<>()) .discardingFiredPanes() - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); return input.apply(rewindow)
