http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java index 2967f2c..eac465c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java @@ -26,8 +26,9 @@ import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -53,8 +54,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + TimestampCombiner timestampCombiner = + (TimestampCombiner) windowingStrategy.getTimestampCombiner(); + WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn(); // get all elements so that we can sort them, has to fit into // memory @@ -88,18 +90,19 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou // create accumulator using the first elements key WindowedValue<KV<K, InputT>> currentValue = iterator.next(); K key = currentValue.getValue().getKey(); - BoundedWindow currentWindow = Iterables.getOnlyElement(currentValue.getWindows()); + W currentWindow = (W) Iterables.getOnlyElement(currentValue.getWindows()); InputT firstValue = currentValue.getValue().getValue(); AccumT accumulator = flinkCombiner.firstInput( key, firstValue, options, sideInputReader, currentValue.getWindows()); - // we use this to keep track of the timestamps assigned by the OutputTimeFn + // we use this to keep track of the timestamps assigned by the TimestampCombiner Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); + timestampCombiner.assign( + currentWindow, windowFn.getOutputTime(currentValue.getTimestamp(), currentWindow)); while (iterator.hasNext()) { WindowedValue<KV<K, InputT>> nextValue = iterator.next(); - BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); + W nextWindow = (W) Iterables.getOnlyElement(nextValue.getWindows()); if (currentWindow.equals(nextWindow)) { // continue accumulating and merge windows @@ -108,9 +111,12 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou accumulator = flinkCombiner.addInput(key, accumulator, value, options, sideInputReader, currentValue.getWindows()); - windowTimestamp = outputTimeFn.combine( - windowTimestamp, - outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + windowTimestamp = + timestampCombiner.combine( + windowTimestamp, + timestampCombiner.assign( + currentWindow, + windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow))); } else { // emit the value that we currently have @@ -127,7 +133,9 @@ public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends Bou InputT value = nextValue.getValue().getValue(); accumulator = flinkCombiner.firstInput(key, value, options, sideInputReader, currentValue.getWindows()); - windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + windowTimestamp = + timestampCombiner.assign( + currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)); } }
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java index 3203446..d015c38 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineContextFactory; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -176,9 +176,9 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { throw new UnsupportedOperationException( String.format("%s is not supported", WatermarkHoldState.class.getSimpleName())); } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java index 24b340e..2dd7c96 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java @@ -38,7 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -186,9 +186,9 @@ public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { throw new UnsupportedOperationException( String.format("%s is not supported", CombiningState.class.getSimpleName())); } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java index 2bf0bf1..17ea62a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; @@ -146,9 +146,9 @@ public class FlinkSplitStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { throw new UnsupportedOperationException( String.format("%s is not supported", CombiningState.class.getSimpleName())); } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index 4f961e5..878c914 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.CombineContextFactory; import org.apache.beam.sdk.util.state.BagState; @@ -185,12 +185,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { return new FlinkWatermarkHoldState<>( - flinkStateBackend, FlinkStateInternals.this, address, namespace, outputTimeFn); + flinkStateBackend, FlinkStateInternals.this, address, namespace, timestampCombiner); } }); } @@ -912,9 +912,9 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } private static class FlinkWatermarkHoldState<K, W extends BoundedWindow> - implements WatermarkHoldState<W> { - private final StateTag<? super K, WatermarkHoldState<W>> address; - private final OutputTimeFn<? super W> outputTimeFn; + implements WatermarkHoldState { + private final StateTag<? super K, WatermarkHoldState> address; + private final TimestampCombiner timestampCombiner; private final StateNamespace namespace; private final KeyedStateBackend<ByteBuffer> flinkStateBackend; private final FlinkStateInternals<K> flinkStateInternals; @@ -923,11 +923,11 @@ public class FlinkStateInternals<K> implements StateInternals<K> { public FlinkWatermarkHoldState( KeyedStateBackend<ByteBuffer> flinkStateBackend, FlinkStateInternals<K> flinkStateInternals, - StateTag<? super K, WatermarkHoldState<W>> address, + StateTag<? super K, WatermarkHoldState> address, StateNamespace namespace, - OutputTimeFn<? super W> outputTimeFn) { + TimestampCombiner timestampCombiner) { this.address = address; - this.outputTimeFn = outputTimeFn; + this.timestampCombiner = timestampCombiner; this.namespace = namespace; this.flinkStateBackend = flinkStateBackend; this.flinkStateInternals = flinkStateInternals; @@ -937,12 +937,12 @@ public class FlinkStateInternals<K> implements StateInternals<K> { } @Override - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; + public TimestampCombiner getTimestampCombiner() { + return timestampCombiner; } @Override - public WatermarkHoldState<W> readLater() { + public WatermarkHoldState readLater() { return this; } @@ -983,7 +983,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { state.update(value); flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value); } else { - Instant combined = outputTimeFn.combine(current, value); + Instant combined = timestampCombiner.combine(current, value); state.update(combined); flinkStateInternals.watermarkHolds.put(namespace.stringKey(), combined); } @@ -1035,7 +1035,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { if (!address.equals(that.address)) { return false; } - if (!outputTimeFn.equals(that.outputTimeFn)) { + if (!timestampCombiner.equals(that.timestampCombiner)) { return false; } return namespace.equals(that.namespace); @@ -1045,7 +1045,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> { @Override public int hashCode() { int result = address.hashCode(); - result = 31 * result + outputTimeFn.hashCode(); + result = 31 * result + timestampCombiner.hashCode(); result = 31 * result + namespace.hashCode(); return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index d140271..17c43bf 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -77,14 +77,12 @@ public class FlinkStateInternalsTest { "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> - WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> - WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); FlinkStateInternals<String> underTest; @@ -274,7 +272,7 @@ public class FlinkStateInternalsTest { @Test public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); // State instances are cached, but depend on the namespace. @@ -298,7 +296,7 @@ public class FlinkStateInternalsTest { @Test public void testWatermarkLatestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); // State instances are cached, but depend on the namespace. @@ -322,7 +320,7 @@ public class FlinkStateInternalsTest { @Test public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); @@ -339,7 +337,7 @@ public class FlinkStateInternalsTest { @Test public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState<BoundedWindow> value = + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); @@ -353,9 +351,9 @@ public class FlinkStateInternalsTest { @Test public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = + WatermarkHoldState value1 = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = + WatermarkHoldState value2 = underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); value1.add(new Instant(3000)); @@ -372,11 +370,11 @@ public class FlinkStateInternalsTest { @Test public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = + WatermarkHoldState value1 = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = + WatermarkHoldState value2 = underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value3 = + WatermarkHoldState value3 = underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); value1.add(new Instant(3000)); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java index 725e9d3..c967521 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java @@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -166,10 +166,10 @@ class SparkStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - return new SparkWatermarkHoldState<>(namespace, address, outputTimeFn); + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { + return new SparkWatermarkHoldState(namespace, address, timestampCombiner); } } @@ -250,21 +250,21 @@ class SparkStateInternals<K> implements StateInternals<K> { } } - private class SparkWatermarkHoldState<W extends BoundedWindow> - extends AbstractState<Instant> implements WatermarkHoldState<W> { + private class SparkWatermarkHoldState extends AbstractState<Instant> + implements WatermarkHoldState { - private final OutputTimeFn<? super W> outputTimeFn; + private final TimestampCombiner timestampCombiner; public SparkWatermarkHoldState( StateNamespace namespace, - StateTag<?, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { + StateTag<?, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { super(namespace, address, InstantCoder.of()); - this.outputTimeFn = outputTimeFn; + this.timestampCombiner = timestampCombiner; } @Override - public SparkWatermarkHoldState<W> readLater() { + public SparkWatermarkHoldState readLater() { return this; } @@ -276,7 +276,10 @@ class SparkStateInternals<K> implements StateInternals<K> { @Override public void add(Instant outputTime) { Instant combined = read(); - combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime); + combined = + (combined == null) + ? outputTime + : getTimestampCombiner().combine(combined, outputTime); writeValue(combined); } @@ -295,8 +298,8 @@ class SparkStateInternals<K> implements StateInternals<K> { } @Override - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; + public TimestampCombiner getTimestampCombiner() { + return timestampCombiner; } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java index fa1c3fc..7d06d6b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java @@ -50,7 +50,7 @@ import org.apache.beam.sdk.values.TupleTag; public class SparkAbstractCombineFn implements Serializable { protected final SparkRuntimeContext runtimeContext; protected final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; - protected final WindowingStrategy<?, ?> windowingStrategy; + protected final WindowingStrategy<?, BoundedWindow> windowingStrategy; public SparkAbstractCombineFn( @@ -59,7 +59,7 @@ public class SparkAbstractCombineFn implements Serializable { WindowingStrategy<?, ?> windowingStrategy) { this.runtimeContext = runtimeContext; this.sideInputs = sideInputs; - this.windowingStrategy = windowingStrategy; + this.windowingStrategy = (WindowingStrategy<?, BoundedWindow>) windowingStrategy; } // each Spark task should get it's own copy of this SparkKeyedCombineFn, and since Spark tasks http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java index 23f5d20..7d026c6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java @@ -29,8 +29,9 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -70,9 +71,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract // sort exploded inputs. Iterable<WindowedValue<InputT>> sortedInputs = sortByWindows(input.explodeWindows()); - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); + WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn(); //--- inputs iterator, by window order. final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator(); @@ -84,9 +84,13 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract accumulator = combineFn.addInput(accumulator, currentInput.getValue(), ctxtForInput(currentInput)); - // keep track of the timestamps assigned by the OutputTimeFn. + // keep track of the timestamps assigned by the TimestampCombiner. Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow); + timestampCombiner.assign( + currentWindow, + windowingStrategy + .getWindowFn() + .getOutputTime(currentInput.getTimestamp(), currentWindow)); // accumulate the next windows, or output. List<WindowedValue<AccumT>> output = Lists.newArrayList(); @@ -109,8 +113,13 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract // keep accumulating and carry on ;-) accumulator = combineFn.addInput(accumulator, nextValue.getValue(), ctxtForInput(nextValue)); - windowTimestamp = outputTimeFn.combine(windowTimestamp, - outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + windowTimestamp = + timestampCombiner.merge( + currentWindow, + windowTimestamp, + windowingStrategy + .getWindowFn() + .getOutputTime(nextValue.getTimestamp(), currentWindow)); } else { // moving to the next window, first add the current accumulation to output // and initialize the accumulator. @@ -121,7 +130,8 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract accumulator = combineFn.addInput(accumulator, nextValue.getValue(), ctxtForInput(nextValue)); currentWindow = nextWindow; - windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + windowTimestamp = timestampCombiner.assign(currentWindow, + windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)); } } @@ -162,8 +172,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract Iterable<WindowedValue<AccumT>> sortedAccumulators = sortByWindows(accumulators); @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); //--- accumulators iterator, by window order. final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator(); @@ -174,7 +183,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract List<AccumT> currentWindowAccumulators = Lists.newArrayList(); currentWindowAccumulators.add(currentValue.getValue()); - // keep track of the timestamps assigned by the OutputTimeFn, + // keep track of the timestamps assigned by the TimestampCombiner, // in createCombiner we already merge the timestamps assigned // to individual elements, here we will just merge them. List<Instant> windowTimestamps = Lists.newArrayList(); @@ -206,7 +215,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract // add the current accumulation to the output and initialize the accumulation. // merge the timestamps of all accumulators to merge. - Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps); + Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps); // merge accumulators. // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>. @@ -231,7 +240,7 @@ public class SparkGlobalCombineFn<InputT, AccumT, OutputT> extends SparkAbstract } // merge the last chunk of accumulators. - Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps); + Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps); Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators); WindowedValue<Iterable<AccumT>> preMergeWindowedValue = WindowedValue.of( accumsToMerge, mergedTimestamp, currentWindow, PaneInfo.NO_FIRING); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java index b5d243f..66c03bc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java @@ -29,8 +29,9 @@ import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -72,9 +73,8 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra // sort exploded inputs. Iterable<WindowedValue<KV<K, InputT>>> sortedInputs = sortByWindows(wkvi.explodeWindows()); - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); + WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn(); //--- inputs iterator, by window order. final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator(); @@ -87,9 +87,13 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra accumulator = combineFn.addInput(key, accumulator, currentInput.getValue().getValue(), ctxtForInput(currentInput)); - // keep track of the timestamps assigned by the OutputTimeFn. + // keep track of the timestamps assigned by the TimestampCombiner. Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentInput.getTimestamp(), currentWindow); + timestampCombiner.assign( + currentWindow, + windowingStrategy + .getWindowFn() + .getOutputTime(currentInput.getTimestamp(), currentWindow)); // accumulate the next windows, or output. List<WindowedValue<KV<K, AccumT>>> output = Lists.newArrayList(); @@ -112,8 +116,12 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra // keep accumulating and carry on ;-) accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(), ctxtForInput(nextValue)); - windowTimestamp = outputTimeFn.combine(windowTimestamp, - outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + windowTimestamp = + timestampCombiner.combine( + windowTimestamp, + timestampCombiner.assign( + currentWindow, + windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow))); } else { // moving to the next window, first add the current accumulation to output // and initialize the accumulator. @@ -124,7 +132,9 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra accumulator = combineFn.addInput(key, accumulator, nextValue.getValue().getValue(), ctxtForInput(nextValue)); currentWindow = nextWindow; - windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + windowTimestamp = + timestampCombiner.assign( + currentWindow, windowFn.getOutputTime(nextValue.getTimestamp(), currentWindow)); } } @@ -170,8 +180,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra Iterable<WindowedValue<KV<K, AccumT>>> sortedAccumulators = sortByWindows(accumulators); @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); //--- accumulators iterator, by window order. final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator(); @@ -183,7 +192,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra List<AccumT> currentWindowAccumulators = Lists.newArrayList(); currentWindowAccumulators.add(currentValue.getValue().getValue()); - // keep track of the timestamps assigned by the OutputTimeFn, + // keep track of the timestamps assigned by the TimestampCombiner, // in createCombiner we already merge the timestamps assigned // to individual elements, here we will just merge them. List<Instant> windowTimestamps = Lists.newArrayList(); @@ -215,7 +224,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra // add the current accumulation to the output and initialize the accumulation. // merge the timestamps of all accumulators to merge. - Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps); + Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps); // merge accumulators. // transforming a KV<K, Iterable<AccumT>> into a KV<K, Iterable<AccumT>>. @@ -241,7 +250,7 @@ public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstra } // merge the last chunk of accumulators. - Instant mergedTimestamp = outputTimeFn.merge(currentWindow, windowTimestamps); + Instant mergedTimestamp = timestampCombiner.merge(currentWindow, windowTimestamps); Iterable<AccumT> accumsToMerge = Iterables.unmodifiableIterable(currentWindowAccumulators); WindowedValue<KV<K, Iterable<AccumT>>> preMergeWindowedValue = WindowedValue.of( KV.of(key, accumsToMerge), mergedTimestamp, currentWindow, PaneInfo.NO_FIRING); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index eac95bf..110de52 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -135,11 +135,6 @@ <dependencies> <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-common-runner-api</artifactId> - </dependency> - - <dependency> <groupId>com.google.http-client</groupId> <artifactId>google-http-client</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java index 63e7903..e8c2f8d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java @@ -17,11 +17,14 @@ */ package org.apache.beam.sdk.testing; +import static com.google.common.base.Preconditions.checkArgument; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -35,8 +38,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Instant; import org.joda.time.ReadableInstant; @@ -252,20 +254,19 @@ public class WindowFnTestUtils { /** * Verifies that later-ending merged windows from any of the timestamps hold up output of - * earlier-ending windows, using the provided {@link WindowFn} and {@link OutputTimeFn}. + * earlier-ending windows, using the provided {@link WindowFn} and {@link TimestampCombiner}. * * <p>Given a list of lists of timestamps, where each list is expected to merge into a single * window with end times in ascending order, assigns and merges windows for each list (as though - * each were a separate key/user session). Then maps each timestamp in the list according to - * {@link OutputTimeFn#assignOutputTime outputTimeFn.assignOutputTime()} and - * {@link OutputTimeFn#combine outputTimeFn.combine()}. + * each were a separate key/user session). Then combines each timestamp in the list according to + * the provided {@link TimestampCombiner}. * * <p>Verifies that a overlapping windows do not hold each other up via the watermark. */ public static <T, W extends IntervalWindow> void validateGetOutputTimestamps( WindowFn<T, W> windowFn, - OutputTimeFn<? super W> outputTimeFn, + TimestampCombiner timestampCombiner, List<List<Long>> timestampsPerWindow) throws Exception { // Assign windows to each timestamp, then merge them, storing the merged windows in @@ -300,10 +301,11 @@ public class WindowFnTestUtils { List<Instant> outputInstants = new ArrayList<>(); for (long inputTimestamp : timestampsForWindow) { - outputInstants.add(outputTimeFn.assignOutputTime(new Instant(inputTimestamp), window)); + outputInstants.add( + assignOutputTime(timestampCombiner, new Instant(inputTimestamp), window)); } - combinedOutputTimestamps.add(OutputTimeFns.combineOutputTimes(outputTimeFn, outputInstants)); + combinedOutputTimestamps.add(combineOutputTimes(timestampCombiner, outputInstants)); } // Consider windows in increasing order of max timestamp; ensure the output timestamp is after @@ -321,4 +323,37 @@ public class WindowFnTestUtils { earlierEndingWindow = window; } } + + private static Instant assignOutputTime( + TimestampCombiner timestampCombiner, Instant inputTimestamp, BoundedWindow window) { + switch (timestampCombiner) { + case EARLIEST: + case LATEST: + return inputTimestamp; + case END_OF_WINDOW: + return window.maxTimestamp(); + default: + throw new IllegalArgumentException( + String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner)); + } + } + + private static Instant combineOutputTimes( + TimestampCombiner timestampCombiner, Iterable<Instant> outputInstants) { + checkArgument( + !Iterables.isEmpty(outputInstants), + "Cannot combine zero instants with %s", + timestampCombiner); + switch(timestampCombiner) { + case EARLIEST: + return Ordering.natural().min(outputInstants); + case LATEST: + return Ordering.natural().max(outputInstants); + case END_OF_WINDOW: + return outputInstants.iterator().next(); + default: + throw new IllegalArgumentException( + String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner)); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index cc92102..d9c4c9f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowingStrategy; @@ -97,7 +98,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * for details on the estimation. * * <p>The timestamp for each emitted pane is determined by the - * {@link Window#withOutputTimeFn windowing operation}. + * {@link Window#withTimestampCombiner(TimestampCombiner)} windowing operation}. * The output {@code PCollection} will have the same {@link WindowFn} * as the input. * http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java deleted file mode 100644 index 0efd278..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import com.google.common.collect.Ordering; -import java.io.Serializable; -import java.util.Objects; -import org.apache.beam.sdk.annotations.Experimental; -import org.joda.time.Instant; - -/** - * <b><i>(Experimental)</i></b> A function from timestamps of input values to the timestamp for a - * computed value. - * - * <p>The function is represented via three components: - * <ol> - * <li>{@link #assignOutputTime} calculates an output timestamp for any input - * value in a particular window.</li> - * <li>The output timestamps for all non-late input values within a window are combined - * according to {@link #combine combine()}, a commutative and associative operation on - * the output timestamps.</li> - * <li>The output timestamp when windows merge is provided by {@link #merge merge()}.</li> - * </ol> - * - * <p>This abstract class cannot be subclassed directly, by design: it may grow - * in consumer-compatible ways that require mutually-exclusive default implementations. To - * create a concrete subclass, extend {@link OutputTimeFn.Defaults} or - * {@link OutputTimeFn.DependsOnlyOnWindow}. Note that as long as this class remains - * experimental, we may also choose to change it in arbitrary backwards-incompatible ways. - * - * @param <W> the type of window. Contravariant: methods accepting any subtype of - * {@code OutputTimeFn<W>} should use the parameter type {@code OutputTimeFn<? super W>}. - */ -@Experimental(Experimental.Kind.OUTPUT_TIME) -public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializable { - - protected OutputTimeFn() { } - - /** - * Returns the output timestamp to use for data depending on the given - * {@code inputTimestamp} in the specified {@code window}. - * - * <p>The result of this method must be between {@code inputTimestamp} and - * {@code window.maxTimestamp()} (inclusive on both sides). - * - * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B}, - * then {@code assignOutputTime(A, window) <= assignOutputTime(B, window)}. - * - * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically - * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is - * suggested that the result in later overlapping windows is past the end of earlier windows - * so that the later windows don't prevent the watermark from - * progressing past the end of the earlier window. - * - * <p>See the overview of {@link OutputTimeFn} for the consistency properties required - * between {@link #assignOutputTime}, {@link #combine}, and {@link #merge}. - */ - public abstract Instant assignOutputTime(Instant inputTimestamp, W window); - - /** - * Combines the given output times, which must be from the same window, into an output time - * for a computed value. - * - * <ul> - * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.</li> - * <li>{@code combine} must be associative: - * {@code combine(a, combine(b, c)).equals(combine(combine(a, b), c))}.</li> - * </ul> - */ - public abstract Instant combine(Instant outputTime, Instant otherOutputTime); - - /** - * Merges the given output times, presumed to be combined output times for windows that - * are merging, into an output time for the {@code resultWindow}. - * - * <p>When windows {@code w1} and {@code w2} merge to become a new window {@code w1plus2}, - * then {@link #merge} must be implemented such that the output time is the same as - * if all timestamps were assigned in {@code w1plus2}. Formally: - * - * <p>{@code fn.merge(w, fn.assignOutputTime(t1, w1), fn.assignOutputTime(t2, w2))} - * - * <p>must be equal to - * - * <p>{@code fn.combine(fn.assignOutputTime(t1, w1plus2), fn.assignOutputTime(t2, w1plus2))} - * - * <p>If the assigned time depends only on the window, the correct implementation of - * {@link #merge merge()} necessarily returns the result of - * {@link #assignOutputTime assignOutputTime(t1, w1plus2)} - * (which equals {@link #assignOutputTime assignOutputTime(t2, w1plus2)}. - * Defaults for this case are provided by {@link DependsOnlyOnWindow}. - * - * <p>For many other {@link OutputTimeFn} implementations, such as taking the earliest or latest - * timestamp, this will be the same as {@link #combine combine()}. Defaults for this - * case are provided by {@link Defaults}. - */ - public abstract Instant merge(W intoWindow, Iterable<? extends Instant> mergingTimestamps); - - /** - * Returns {@code true} if the result of combination of many output timestamps actually depends - * only on the earliest. - * - * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp - * to be combined. - */ - public abstract boolean dependsOnlyOnEarliestInputTimestamp(); - - /** - * Returns {@code true} if the result does not depend on what outputs were combined but only - * the window they are in. The canonical example is if all timestamps are sure to - * be the end of the window. - * - * <p>This may allow optimizations, since it is typically very efficient to retrieve the window - * and combining output timestamps is not necessary. - * - * <p>If the assigned output time for an implementation depends only on the window, consider - * extending {@link DependsOnlyOnWindow}, which returns {@code true} here and also provides - * a framework for easily implementing a correct {@link #merge}, {@link #combine} and - * {@link #assignOutputTime}. - */ - public abstract boolean dependsOnlyOnWindow(); - - /** - * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} where the - * output time depends on the input element timestamps and possibly the window. - * - * <p>To complete an implementation, override {@link #assignOutputTime}, at a minimum. - * - * <p>By default, {@link #combine} and {@link #merge} return the earliest timestamp of their - * inputs. - */ - public abstract static class Defaults<W extends BoundedWindow> extends OutputTimeFn<W> { - - protected Defaults() { - super(); - } - - /** - * {@inheritDoc} - * - * @return the earlier of the two timestamps. - */ - @Override - public Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) { - return Ordering.natural().min(outputTimestamp, otherOutputTimestamp); - } - - /** - * {@inheritDoc} - * - * @return the result of {@link #combine combine(outputTimstamp, otherOutputTimestamp)}, - * by default. - */ - @Override - public Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) { - return OutputTimeFns.combineOutputTimes(this, mergingTimestamps); - } - - /** - * {@inheritDoc} - * - * @return {@code false} by default. An {@link OutputTimeFn} that is known to depend only on the - * window should extend {@link OutputTimeFn.DependsOnlyOnWindow}. - */ - @Override - public boolean dependsOnlyOnWindow() { - return false; - } - - /** - * {@inheritDoc} - * - * @return {@code true} by default. - */ - @Override - public boolean dependsOnlyOnEarliestInputTimestamp() { - return false; - } - - /** - * {@inheritDoc} - * - * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by - * default. - */ - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - - return this.getClass().equals(other.getClass()); - } - - @Override - public int hashCode() { - return Objects.hash(getClass()); - } - } - - /** - * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} when the - * output time depends only on the window. - * - * <p>To complete an implementation, override {@link #assignOutputTime(BoundedWindow)}. - */ - public abstract static class DependsOnlyOnWindow<W extends BoundedWindow> - extends OutputTimeFn<W> { - - protected DependsOnlyOnWindow() { - super(); - } - - /** - * Returns the output timestamp to use for data in the specified {@code window}. - * - * <p>Note that the result of this method must be between the maximum possible input timestamp - * in {@code window} and {@code window.maxTimestamp()} (inclusive on both sides). - * - * <p>For example, using {@code Sessions.withGapDuration(gapDuration)}, we know that all input - * timestamps must lie at least {@code gapDuration} from the end of the session, so - * {@code window.maxTimestamp() - gapDuration} is an acceptable assigned timestamp. - * - * @see #assignOutputTime(Instant, BoundedWindow) - */ - protected abstract Instant assignOutputTime(W window); - - /** - * {@inheritDoc} - * - * @return the result of {#link assignOutputTime(BoundedWindow) assignOutputTime(window)}. - */ - @Override - public final Instant assignOutputTime(Instant timestamp, W window) { - return assignOutputTime(window); - } - - /** - * {@inheritDoc} - * - * @return the same timestamp as both argument timestamps, which are necessarily equal. - */ - @Override - public final Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) { - return outputTimestamp; - } - - /** - * {@inheritDoc} - * - * @return the result of - * {@link #assignOutputTime(BoundedWindow) assignOutputTime(resultWindow)}. - */ - @Override - public final Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) { - return assignOutputTime(resultWindow); - } - - /** - * {@inheritDoc} - * - * @return {@code true}. - */ - @Override - public final boolean dependsOnlyOnWindow() { - return true; - } - - /** - * {@inheritDoc} - * - * @return {@code true}. Since the output time depends only on the window, it can - * certainly be ascertained given a single input timestamp. - */ - @Override - public final boolean dependsOnlyOnEarliestInputTimestamp() { - return true; - } - - /** - * {@inheritDoc} - * - * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by - * default. - */ - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - - return this.getClass().equals(other.getClass()); - } - - @Override - public int hashCode() { - return Objects.hash(getClass()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java deleted file mode 100644 index b5d67fa..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.joda.time.Instant; - -/** - * <b><i>(Experimental)</i></b> Static utility methods and provided implementations for - * {@link OutputTimeFn}. - */ -@Experimental(Experimental.Kind.OUTPUT_TIME) -public class OutputTimeFns { - /** - * The policy of outputting at the earliest of the input timestamps for non-late input data - * that led to a computed value. - * - * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time - * elements being aggregated via some function {@code f} into - * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output - * timestamp of the result will be the earliest of the event time timestamps - * - * <p>If data arrives late, it has no effect on the output timestamp. - */ - public static OutputTimeFn<BoundedWindow> outputAtEarliestInputTimestamp() { - return new OutputAtEarliestInputTimestamp(); - } - - /** - * The policy of holding the watermark to the latest of the input timestamps - * for non-late input data that led to a computed value. - * - * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time - * elements being aggregated via some function {@code f} into - * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output - * timestamp of the result will be the latest of the event time timestamps - * - * <p>If data arrives late, it has no effect on the output timestamp. - */ - public static OutputTimeFn<BoundedWindow> outputAtLatestInputTimestamp() { - return new OutputAtLatestInputTimestamp(); - } - - /** - * The policy of outputting with timestamps at the end of the window. - * - * <p>Note that this output timestamp depends only on the window. See - * {#link dependsOnlyOnWindow()}. - * - * <p>When windows merge, instead of using {@link OutputTimeFn#combine} to obtain an output - * timestamp for the results in the new window, it is mandatory to obtain a new output - * timestamp from {@link OutputTimeFn#assignOutputTime} with the new window and an arbitrary - * timestamp (because it is guaranteed that the timestamp is irrelevant). - * - * <p>For non-merging window functions, this {@link OutputTimeFn} works transparently. - */ - public static OutputTimeFn<BoundedWindow> outputAtEndOfWindow() { - return new OutputAtEndOfWindow(); - } - - /** - * Applies the given {@link OutputTimeFn} to the given output times, obtaining - * the output time for a value computed. See {@link OutputTimeFn#combine} for - * a full specification. - * - * @throws IllegalArgumentException if {@code outputTimes} is empty. - */ - public static Instant combineOutputTimes( - OutputTimeFn<?> outputTimeFn, Iterable<? extends Instant> outputTimes) { - checkArgument( - !Iterables.isEmpty(outputTimes), - "Collection of output times must not be empty in %s.combineOutputTimes", - OutputTimeFns.class.getName()); - - @Nullable - Instant combinedOutputTime = null; - for (Instant outputTime : outputTimes) { - combinedOutputTime = - combinedOutputTime == null - ? outputTime : outputTimeFn.combine(combinedOutputTime, outputTime); - } - return combinedOutputTime; - } - - /** - * See {@link #outputAtEarliestInputTimestamp}. - */ - private static class OutputAtEarliestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> { - @Override - public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) { - return inputTimestamp; - } - - @Override - public Instant combine(Instant outputTime, Instant otherOutputTime) { - return Ordering.natural().min(outputTime, otherOutputTime); - } - - /** - * {@inheritDoc} - * - * @return {@code true}. The result of any combine will be the earliest input timestamp. - */ - @Override - public boolean dependsOnlyOnEarliestInputTimestamp() { - return true; - } - } - - /** - * See {@link #outputAtLatestInputTimestamp}. - */ - private static class OutputAtLatestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> { - @Override - public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) { - return inputTimestamp; - } - - @Override - public Instant combine(Instant outputTime, Instant otherOutputTime) { - return Ordering.natural().max(outputTime, otherOutputTime); - } - - /** - * {@inheritDoc} - * - * @return {@code false}. - */ - @Override - public boolean dependsOnlyOnEarliestInputTimestamp() { - return false; - } - } - - private static class OutputAtEndOfWindow extends OutputTimeFn.DependsOnlyOnWindow<BoundedWindow> { - - /** - *{@inheritDoc} - * - *@return {@code window.maxTimestamp()}. - */ - @Override - protected Instant assignOutputTime(BoundedWindow window) { - return window.maxTimestamp(); - } - - @Override - public String toString() { - return getClass().getCanonicalName(); - } - } - - public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) { - if (outputTimeFn instanceof OutputAtEarliestInputTimestamp) { - return RunnerApi.OutputTime.EARLIEST_IN_PANE; - } else if (outputTimeFn instanceof OutputAtLatestInputTimestamp) { - return RunnerApi.OutputTime.LATEST_IN_PANE; - } else if (outputTimeFn instanceof OutputAtEndOfWindow) { - return RunnerApi.OutputTime.END_OF_WINDOW; - } else { - throw new IllegalArgumentException( - String.format( - "Cannot convert %s to %s: %s", - OutputTimeFn.class.getCanonicalName(), - RunnerApi.OutputTime.class.getCanonicalName(), - outputTimeFn)); - } - } - - public static OutputTimeFn<?> fromProto(RunnerApi.OutputTime proto) { - switch (proto) { - case EARLIEST_IN_PANE: - return OutputTimeFns.outputAtEarliestInputTimestamp(); - case LATEST_IN_PANE: - return OutputTimeFns.outputAtLatestInputTimestamp(); - case END_OF_WINDOW: - return OutputTimeFns.outputAtEndOfWindow(); - case UNRECOGNIZED: - default: - // Whether or not it is proto that cannot recognize it (due to the version of the - // generated code we link to) or the switch hasn't been updated to handle it, - // the situation is the same: we don't know what this OutputTime means - throw new IllegalArgumentException( - String.format( - "Cannot convert unknown %s to %s: %s", - RunnerApi.OutputTime.class.getCanonicalName(), - OutputTimeFn.class.getCanonicalName(), - proto)); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java new file mode 100644 index 0000000..39fe8a9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import java.util.Arrays; +import java.util.Collections; +import org.apache.beam.sdk.annotations.Experimental; +import org.joda.time.Instant; + +/** + * Policies for combining timestamps that occur within a window. + */ +@Experimental(Experimental.Kind.OUTPUT_TIME) +public enum TimestampCombiner { + /** + * The policy of taking at the earliest of a set of timestamps. + * + * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after + * they are shifted by the {@link WindowFn} (to allow downstream watermark progress). + * + * <p>If data arrives late, it has no effect on the output timestamp. + */ + EARLIEST { + @Override + public Instant combine(Iterable<? extends Instant> timestamps) { + return Ordering.natural().min(timestamps); + } + + @Override + public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) { + return combine(mergingTimestamps); + } + + @Override + public boolean dependsOnlyOnEarliestTimestamp() { + return true; + } + + @Override + public boolean dependsOnlyOnWindow() { + return false; + } + }, + + /** + * The policy of taking the latest of a set of timestamps. + * + * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after + * they are shifted by the {@link WindowFn} (to allow downstream watermark progress). + * + * <p>If data arrives late, it has no effect on the output timestamp. + */ + LATEST { + @Override + public Instant combine(Iterable<? extends Instant> timestamps) { + return Ordering.natural().max(timestamps); + } + + @Override + public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) { + return combine(mergingTimestamps); + } + + @Override + public boolean dependsOnlyOnEarliestTimestamp() { + return false; + } + + @Override + public boolean dependsOnlyOnWindow() { + return false; + } + }, + + /** + * The policy of using the end of the window, regardless of input timestamps. + * + * <p>When used in windowed aggregations, the timestamps of non-late inputs will be combined after + * they are shifted by the {@link WindowFn} (to allow downstream watermark progress). + * + * <p>If data arrives late, it has no effect on the output timestamp. + */ + END_OF_WINDOW { + @Override + public Instant combine(Iterable<? extends Instant> timestamps) { + checkArgument(Iterables.size(timestamps) > 0); + return Iterables.get(timestamps, 0); + } + + @Override + public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) { + return intoWindow.maxTimestamp(); + } + + @Override + public boolean dependsOnlyOnEarliestTimestamp() { + return false; + } + + @Override + public boolean dependsOnlyOnWindow() { + return true; + } + }; + + /** + * Combines the given times, which must be from the same window and must have been passed through + * {@link #merge}. + * + * <ul> + * <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}. + * <li>{@code combine} must be associative: {@code combine(a, combine(b, + * c)).equals(combine(combine(a, b), c))}. + * </ul> + */ + public abstract Instant combine(Iterable<? extends Instant> timestamps); + + /** + * Merges the given timestamps, which may have originated in separate windows, into the context of + * the result window. + */ + public abstract Instant merge( + BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps); + + /** + * Shorthand for {@link #merge} with just one element, to place it into the context of + * a window. + * + * <p>For example, the {@link #END_OF_WINDOW} policy moves the timestamp to the end of the window. + */ + public final Instant assign(BoundedWindow intoWindow, Instant timestamp) { + return merge(intoWindow, Collections.singleton(timestamp)); + } + + /** + * Varargs variant of {@link #combine}. + */ + public final Instant combine(Instant... timestamps) { + return combine(Arrays.asList(timestamps)); + } + + /** + * Varargs variant of {@link #merge}. + */ + public final Instant merge(BoundedWindow intoWindow, Instant... timestamps) { + return merge(intoWindow, Arrays.asList(timestamps)); + } + + /** + * Returns {@code true} if the result of combination of many output timestamps actually depends + * only on the earliest. + * + * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp + * to be combined. + */ + public abstract boolean dependsOnlyOnEarliestTimestamp(); + + /** + * Returns {@code true} if the result does not depend on what outputs were combined but only + * the window they are in. The canonical example is if all timestamps are sure to + * be the end of the window. + * + * <p>This may allow optimizations, since it is typically very efficient to retrieve the window + * and combining output timestamps is not necessary. + */ + public abstract boolean dependsOnlyOnWindow(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 1000ff7..cb7b430 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -193,7 +193,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T @Nullable abstract AccumulationMode getAccumulationMode(); @Nullable abstract Duration getAllowedLateness(); @Nullable abstract ClosingBehavior getClosingBehavior(); - @Nullable abstract OutputTimeFn<?> getOutputTimeFn(); + @Nullable abstract TimestampCombiner getTimestampCombiner(); abstract Builder<T> toBuilder(); @@ -204,7 +204,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T abstract Builder<T> setAccumulationMode(AccumulationMode mode); abstract Builder<T> setAllowedLateness(Duration allowedLateness); abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior); - abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn); + abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner); abstract Window<T> build(); } @@ -273,12 +273,12 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T } /** - * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control + * <b><i>(Experimental)</i></b> Override the default {@link TimestampCombiner}, to control * the output timestamp of values output from a {@link GroupByKey} operation. */ @Experimental(Kind.OUTPUT_TIME) - public Window<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) { - return toBuilder().setOutputTimeFn(outputTimeFn).build(); + public Window<T> withTimestampCombiner(TimestampCombiner timestampCombiner) { + return toBuilder().setTimestampCombiner(timestampCombiner).build(); } /** @@ -300,8 +300,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T * Get the output strategy of this {@link Window Window PTransform}. For internal use * only. */ - // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is - // casting between wildcards public WindowingStrategy<?, ?> getOutputStrategyInternal( WindowingStrategy<?, ?> inputStrategy) { WindowingStrategy<?, ?> result = inputStrategy; @@ -320,8 +318,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T if (getClosingBehavior() != null) { result = result.withClosingBehavior(getClosingBehavior()); } - if (getOutputTimeFn() != null) { - result = result.withOutputTimeFn(getOutputTimeFn()); + if (getTimestampCombiner() != null) { + result = result.withTimestampCombiner(getTimestampCombiner()); } return result; } @@ -411,9 +409,9 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T .withLabel("Window Closing Behavior")); } - if (getOutputTimeFn() != null) { - builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass()) - .withLabel("Output Time Function")); + if (getTimestampCombiner() != null) { + builder.add(DisplayData.item("timestampCombiner", getTimestampCombiner().toString()) + .withLabel("Timestamp Combiner")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index 0c27c4f..706e039 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -57,13 +57,14 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti // If the input has already had its windows merged, then the GBK that performed the merge // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged. - // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time. + // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in + // time. // Because this outputs as fast as possible, this should not hold the watermark. Window<KV<K, V>> rewindow = Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder())) .triggering(new ReshuffleTrigger<>()) .discardingFiredPanes() - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); return input.apply(rewindow)
