Revert "Replace OutputTimeFn UDF with TimestampCombiner enum" This reverts commit f38e4271334fced94e8dc1dc97f47b60fa810586.
It will require a synchronous Dataflow worker container bump. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/83d41fcc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/83d41fcc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/83d41fcc Branch: refs/heads/master Commit: 83d41fcce0c7b123459e5d26ab9938de49f48dab Parents: bb12a56 Author: Kenneth Knowles <[email protected]> Authored: Wed Apr 26 19:33:55 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Wed Apr 26 19:33:55 2017 -0700 ---------------------------------------------------------------------- .../beam/examples/complete/game/GameStats.java | 4 +- .../translation/utils/ApexStateInternals.java | 26 +- .../translation/GroupByKeyTranslatorTest.java | 10 +- .../utils/ApexStateInternalsTest.java | 33 +- .../core/construction/WindowingStrategies.java | 52 +-- .../construction/WindowingStrategiesTest.java | 6 +- .../runners/core/InMemoryStateInternals.java | 32 +- .../beam/runners/core/ReduceFnRunner.java | 4 +- .../beam/runners/core/SplittableParDo.java | 8 +- .../apache/beam/runners/core/StateMerging.java | 32 +- .../org/apache/beam/runners/core/StateTag.java | 11 +- .../org/apache/beam/runners/core/StateTags.java | 16 +- .../core/TestInMemoryStateInternals.java | 2 +- .../apache/beam/runners/core/WatermarkHold.java | 45 ++- .../core/GroupAlsoByWindowsProperties.java | 20 +- .../core/InMemoryStateInternalsTest.java | 34 +- .../beam/runners/core/ReduceFnRunnerTest.java | 38 +-- .../beam/runners/core/ReduceFnTester.java | 13 +- .../apache/beam/runners/core/StateTagTest.java | 16 +- .../CopyOnAccessInMemoryStateInternals.java | 24 +- .../direct/ParDoMultiOverrideFactory.java | 6 +- .../CopyOnAccessInMemoryStateInternalsTest.java | 54 ++-- .../functions/HashingFlinkCombineRunner.java | 19 +- .../functions/SortingFlinkCombineRunner.java | 30 +- .../state/FlinkBroadcastStateInternals.java | 8 +- .../state/FlinkKeyGroupStateInternals.java | 8 +- .../state/FlinkSplitStateInternals.java | 8 +- .../streaming/state/FlinkStateInternals.java | 34 +- .../streaming/FlinkStateInternalsTest.java | 34 +- .../spark/stateful/SparkStateInternals.java | 33 +- .../translation/SparkAbstractCombineFn.java | 4 +- .../spark/translation/SparkGlobalCombineFn.java | 37 +-- .../spark/translation/SparkKeyedCombineFn.java | 37 +-- sdks/java/core/pom.xml | 5 + .../beam/sdk/testing/WindowFnTestUtils.java | 53 +--- .../apache/beam/sdk/transforms/GroupByKey.java | 3 +- .../sdk/transforms/windowing/OutputTimeFn.java | 314 +++++++++++++++++++ .../sdk/transforms/windowing/OutputTimeFns.java | 212 +++++++++++++ .../transforms/windowing/TimestampCombiner.java | 186 ----------- .../beam/sdk/transforms/windowing/Window.java | 22 +- .../org/apache/beam/sdk/util/Reshuffle.java | 7 +- .../apache/beam/sdk/util/WindowingStrategy.java | 176 ++++++++--- .../apache/beam/sdk/util/state/StateBinder.java | 12 +- .../apache/beam/sdk/util/state/StateSpecs.java | 23 +- .../beam/sdk/util/state/WatermarkHoldState.java | 19 +- .../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 + .../beam/sdk/transforms/GroupByKeyTest.java | 10 +- .../sdk/transforms/join/CoGroupByKeyTest.java | 6 +- .../transforms/windowing/OutputTimeFnsTest.java | 51 +++ .../sdk/transforms/windowing/SessionsTest.java | 6 +- .../sdk/transforms/windowing/WindowTest.java | 23 +- .../sdk/transforms/windowing/WindowingTest.java | 2 +- .../org/apache/beam/GcpCoreApiSurfaceTest.java | 1 + 53 files changed, 1130 insertions(+), 740 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index e0048b7..b6c05be 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -313,7 +313,7 @@ public class GameStats extends LeaderBoard { userEvents .apply("WindowIntoSessions", Window.<KV<String, Integer>>into( Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap()))) - .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) // For this use, we care only about the existence of the session, not any particular // information aggregated over it, so the following is an efficient way to do that. .apply(Combine.perKey(x -> 0)) http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java index ec8f666..cfc57cd 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.BagState; @@ -150,10 +150,10 @@ public class ApexStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { - return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner); + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { + return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn); } @Override @@ -269,16 +269,16 @@ public class ApexStateInternals<K> implements StateInternals<K> { } private final class ApexWatermarkHoldState<W extends BoundedWindow> - extends AbstractState<Instant> implements WatermarkHoldState { + extends AbstractState<Instant> implements WatermarkHoldState<W> { - private final TimestampCombiner timestampCombiner; + private final OutputTimeFn<? super W> outputTimeFn; public ApexWatermarkHoldState( StateNamespace namespace, - StateTag<?, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { + StateTag<?, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { super(namespace, address, InstantCoder.of()); - this.timestampCombiner = timestampCombiner; + this.outputTimeFn = outputTimeFn; } @Override @@ -294,7 +294,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { @Override public void add(Instant outputTime) { Instant combined = read(); - combined = (combined == null) ? outputTime : timestampCombiner.combine(combined, outputTime); + combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime); writeValue(combined); } @@ -313,8 +313,8 @@ public class ApexStateInternals<K> implements StateInternals<K> { } @Override - public TimestampCombiner getTimestampCombiner() { - return timestampCombiner; + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; } } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java index 9c61b47..193de71 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java @@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.joda.time.Duration; @@ -83,12 +83,12 @@ public class GroupByKeyTranslatorTest { ); p.apply(Read.from(new TestSource(data, new Instant(5000)))) - .apply( - Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))) - .withTimestampCombiner(TimestampCombiner.LATEST)) + .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))) + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())) .apply(Count.<String>perElement()) .apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>())) - .apply(ParDo.of(new EmbeddedCollector())); + .apply(ParDo.of(new EmbeddedCollector())) + ; ApexRunnerResult result = (ApexRunnerResult) p.run(); result.getApexDAG(); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java index 225b654..7160e45 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; @@ -65,13 +65,14 @@ public class ApexStateInternalsTest { "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState> + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); private ApexStateInternals<String> underTest; @@ -226,7 +227,7 @@ public class ApexStateInternalsTest { @Test public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState value = + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); // State instances are cached, but depend on the namespace. @@ -250,7 +251,7 @@ public class ApexStateInternalsTest { @Test public void testWatermarkLatestState() throws Exception { - WatermarkHoldState value = + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); // State instances are cached, but depend on the namespace. @@ -274,7 +275,7 @@ public class ApexStateInternalsTest { @Test public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); @@ -291,7 +292,7 @@ public class ApexStateInternalsTest { @Test public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState value = + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); @@ -305,9 +306,9 @@ public class ApexStateInternalsTest { @Test public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = + WatermarkHoldState<BoundedWindow> value1 = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState value2 = + WatermarkHoldState<BoundedWindow> value2 = underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); value1.add(new Instant(3000)); @@ -324,11 +325,11 @@ public class ApexStateInternalsTest { @Test public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = + WatermarkHoldState<BoundedWindow> value1 = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState value2 = + WatermarkHoldState<BoundedWindow> value2 = underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState value3 = + WatermarkHoldState<BoundedWindow> value3 = underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); value1.add(new Instant(3000)); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java index 0c400db..3d7deef 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java @@ -28,15 +28,16 @@ import java.io.Serializable; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes; import org.joda.time.Duration; /** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */ @@ -114,42 +115,11 @@ public class WindowingStrategies implements Serializable { } } - public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) { - switch(timestampCombiner) { - case EARLIEST: - return OutputTime.EARLIEST_IN_PANE; - case END_OF_WINDOW: - return OutputTime.END_OF_WINDOW; - case LATEST: - return OutputTime.LATEST_IN_PANE; - default: - throw new IllegalArgumentException( - String.format( - "Unknown %s: %s", - TimestampCombiner.class.getSimpleName(), - timestampCombiner)); - } - } - - public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) { - switch (proto) { - case EARLIEST_IN_PANE: - return TimestampCombiner.EARLIEST; - case END_OF_WINDOW: - return TimestampCombiner.END_OF_WINDOW; - case LATEST_IN_PANE: - return TimestampCombiner.LATEST; - case UNRECOGNIZED: - default: - // Whether or not it is proto that cannot recognize it (due to the version of the - // generated code we link to) or the switch hasn't been updated to handle it, - // the situation is the same: we don't know what this OutputTime means - throw new IllegalArgumentException( - String.format( - "Cannot convert unknown %s to %s: %s", - RunnerApi.OutputTime.class.getCanonicalName(), - OutputTime.class.getCanonicalName(), - proto)); + public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) { + if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) { + return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn()); + } else { + return OutputTimeFns.toProto(outputTimeFn); } } @@ -207,7 +177,7 @@ public class WindowingStrategies implements Serializable { RunnerApi.WindowingStrategy.Builder windowingStrategyProto = RunnerApi.WindowingStrategy.newBuilder() - .setOutputTime(toProto(windowingStrategy.getTimestampCombiner())) + .setOutputTime(toProto(windowingStrategy.getOutputTimeFn())) .setAccumulationMode(toProto(windowingStrategy.getMode())) .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior())) .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) @@ -259,7 +229,7 @@ public class WindowingStrategies implements Serializable { "WindowFn"); WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn; - TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime()); + OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime()); AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode()); Trigger trigger = Triggers.fromProto(proto.getTrigger()); ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); @@ -269,7 +239,7 @@ public class WindowingStrategies implements Serializable { .withAllowedLateness(allowedLateness) .withMode(accumulationMode) .withTrigger(trigger) - .withTimestampCombiner(timestampCombiner) + .withOutputTimeFn(outputTimeFn) .withClosingBehavior(closingBehavior); } } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java index 78ac61c..62bba8e 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -68,14 +68,14 @@ public class WindowingStrategiesTest { .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withTrigger(REPRESENTATIVE_TRIGGER) .withAllowedLateness(Duration.millis(71)) - .withTimestampCombiner(TimestampCombiner.EARLIEST)), + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())), toProtoAndBackSpec( WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN) .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withTrigger(REPRESENTATIVE_TRIGGER) .withAllowedLateness(Duration.millis(93)) - .withTimestampCombiner(TimestampCombiner.LATEST))); + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))); } @Parameter(0) http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 9fb8e3f..55b7fc2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -156,10 +156,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { - return new InMemoryWatermarkHold<W>(timestampCombiner); + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { + return new InMemoryWatermarkHold<W>(outputTimeFn); } @Override @@ -233,19 +233,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { * An {@link InMemoryState} implementation of {@link WatermarkHoldState}. */ public static final class InMemoryWatermarkHold<W extends BoundedWindow> - implements WatermarkHoldState, InMemoryState<InMemoryWatermarkHold<W>> { + implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> { - private final TimestampCombiner timestampCombiner; + private final OutputTimeFn<? super W> outputTimeFn; @Nullable private Instant combinedHold = null; - public InMemoryWatermarkHold(TimestampCombiner timestampCombiner) { - this.timestampCombiner = timestampCombiner; + public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) { + this.outputTimeFn = outputTimeFn; } @Override - public InMemoryWatermarkHold readLater() { + public InMemoryWatermarkHold<W> readLater() { return this; } @@ -263,10 +263,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { @Override public void add(Instant outputTime) { - combinedHold = - combinedHold == null - ? outputTime - : timestampCombiner.combine(combinedHold, outputTime); + combinedHold = combinedHold == null ? outputTime + : outputTimeFn.combine(combinedHold, outputTime); } @Override @@ -289,8 +287,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public TimestampCombiner getTimestampCombiner() { - return timestampCombiner; + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; } @Override @@ -301,7 +299,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { @Override public InMemoryWatermarkHold<W> copy() { InMemoryWatermarkHold<W> that = - new InMemoryWatermarkHold<>(timestampCombiner); + new InMemoryWatermarkHold<>(outputTimeFn); that.combinedHold = this.combinedHold; return that; } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 4c70c97..34db752 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -47,9 +47,9 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; @@ -171,7 +171,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { * <ul> * <li>State: Bag of hold timestamps. * <li>State style: RENAMED - * <li>Merging: Depending on {@link TimestampCombiner}, may need to be recalculated on merging. + * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging. * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection * hold. * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected. http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 5273e86..31d89ee 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -355,10 +355,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> * the input watermark when the first {@link DoFn.ProcessElement} call for this element * completes. */ - private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag = + private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag = StateTags.makeSystemTagInternal( StateTags.<GlobalWindow>watermarkStateInternal( - "hold", TimestampCombiner.LATEST)); + "hold", OutputTimeFns.outputAtLatestInputTimestamp())); /** * The state cell containing a copy of the element. Written during the first {@link @@ -480,7 +480,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> stateInternals.state(stateNamespace, elementTag); ValueState<RestrictionT> restrictionState = stateInternals.state(stateNamespace, restrictionTag); - WatermarkHoldState holdState = + WatermarkHoldState<GlobalWindow> holdState = stateInternals.state(stateNamespace, watermarkHoldTag); ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction; http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java index ce37fd3..3410850 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java @@ -218,24 +218,24 @@ public class StateMerging { */ public static <K, W extends BoundedWindow> void prefetchWatermarks( MergingStateAccessor<K, W> context, - StateTag<? super K, WatermarkHoldState> address) { - Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address); - WatermarkHoldState result = context.access(address); + StateTag<? super K, WatermarkHoldState<W>> address) { + Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address); + WatermarkHoldState<W> result = context.access(address); if (map.isEmpty()) { // Nothing to prefetch. return; } if (map.size() == 1 && map.values().contains(result) - && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) { + && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) { // Nothing to change. return; } - if (result.getTimestampCombiner().dependsOnlyOnWindow()) { + if (result.getOutputTimeFn().dependsOnlyOnWindow()) { // No need to read existing holds. return; } // Prefetch. - for (WatermarkHoldState source : map.values()) { + for (WatermarkHoldState<W> source : map.values()) { prefetchRead(source); } } @@ -250,7 +250,7 @@ public class StateMerging { */ public static <K, W extends BoundedWindow> void mergeWatermarks( MergingStateAccessor<K, W> context, - StateTag<? super K, WatermarkHoldState> address, + StateTag<? super K, WatermarkHoldState<W>> address, W mergeResult) { mergeWatermarks( context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult); @@ -261,31 +261,31 @@ public class StateMerging { * into {@code result}, where the final merge result window is {@code mergeResult}. */ public static <W extends BoundedWindow> void mergeWatermarks( - Collection<WatermarkHoldState> sources, WatermarkHoldState result, + Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result, W resultWindow) { if (sources.isEmpty()) { // Nothing to merge. return; } if (sources.size() == 1 && sources.contains(result) - && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) { + && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) { // Nothing to merge. return; } - if (result.getTimestampCombiner().dependsOnlyOnWindow()) { + if (result.getOutputTimeFn().dependsOnlyOnWindow()) { // Clear sources. - for (WatermarkHoldState source : sources) { + for (WatermarkHoldState<W> source : sources) { source.clear(); } // Update directly from window-derived hold. - Instant hold = - result.getTimestampCombiner().assign(resultWindow, BoundedWindow.TIMESTAMP_MIN_VALUE); + Instant hold = result.getOutputTimeFn().assignOutputTime( + BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow); checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE)); result.add(hold); } else { // Prefetch. List<ReadableState<Instant>> futures = new ArrayList<>(sources.size()); - for (WatermarkHoldState source : sources) { + for (WatermarkHoldState<W> source : sources) { futures.add(source); } // Read. @@ -297,12 +297,12 @@ public class StateMerging { } } // Clear sources. - for (WatermarkHoldState source : sources) { + for (WatermarkHoldState<W> source : sources) { source.clear(); } if (!outputTimesToMerge.isEmpty()) { // Merge and update. - result.add(result.getTimestampCombiner().merge(resultWindow, outputTimesToMerge)); + result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge)); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java index a5d262a..12c59ad 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; @@ -115,10 +115,11 @@ public interface StateTag<K, StateT extends State> extends Serializable { /** * Bind to a watermark {@link StateSpec}. * - * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps - * added to the returned {@link WatermarkHoldState} are to be combined. + * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to + * the returned {@link WatermarkHoldState} are to be combined. */ - <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner); + <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> spec, + OutputTimeFn<? super W> outputTimeFn); } } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index 2b3f4b8..3a45569 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -29,7 +29,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; @@ -110,11 +110,11 @@ public class StateTags { } @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( String id, - StateSpec<? super K, WatermarkHoldState> spec, - TimestampCombiner timestampCombiner) { - return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner); + StateSpec<? super K, WatermarkHoldState<W>> spec, + OutputTimeFn<? super W> outputTimeFn) { + return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn); } }; } @@ -228,10 +228,10 @@ public class StateTags { /** * Create a state tag for holding the watermark. */ - public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState> - watermarkStateInternal(String id, TimestampCombiner timestampCombiner) { + public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>> + watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) { return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner)); + new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java index 1dfb85f..0321a33 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java @@ -52,7 +52,7 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> { Instant minimum = null; for (State storage : inMemoryState.values()) { if (storage instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState) storage).read(); + Instant hold = ((WatermarkHoldState<?>) storage).read(); if (minimum == null || (hold != null && hold.isBefore(minimum))) { minimum = hold; } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 9bb9c62..d3c4bc7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -23,8 +23,9 @@ import com.google.common.annotations.VisibleForTesting; import java.io.Serializable; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowingStrategy; @@ -54,38 +55,37 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { * used for elements. */ public static <W extends BoundedWindow> - StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner( - TimestampCombiner timestampCombiner) { - return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal( - StateTags.<W>watermarkStateInternal("hold", timestampCombiner)); + StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn( + OutputTimeFn<? super W> outputTimeFn) { + return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal( + StateTags.<W>watermarkStateInternal("hold", outputTimeFn)); } /** * Tag for state containing end-of-window and garbage collection output watermark holds. - * (We can't piggy-back on the data hold state since the timestampCombiner may be - * {@link TimestampCombiner#EARLIEST}, in which case every pane will + * (We can't piggy-back on the data hold state since the outputTimeFn may be + * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will * would take the end-of-window time as its element time.) */ @VisibleForTesting - public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG = + public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal( - "extra", TimestampCombiner.EARLIEST)); + "extra", OutputTimeFns.outputAtEarliestInputTimestamp())); private final TimerInternals timerInternals; private final WindowingStrategy<?, W> windowingStrategy; - private final StateTag<Object, WatermarkHoldState> elementHoldTag; + private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag; public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) { this.timerInternals = timerInternals; this.windowingStrategy = windowingStrategy; - this.elementHoldTag = - watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner()); + this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn()); } /** * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp - * of the element in {@code context}. We allow the actual hold time to be shifted later by the - * {@link TimestampCombiner}, but no further than the end of the window. The hold will + * of the element in {@code context}. We allow the actual hold time to be shifted later by + * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold * was placed, or {@literal null} if no hold was placed. * @@ -199,18 +199,15 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { * strategy's output time function. */ private Instant shift(Instant timestamp, W window) { - Instant shifted = - windowingStrategy - .getTimestampCombiner() - .assign(window, windowingStrategy.getWindowFn().getOutputTime(timestamp, window)); + Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window); checkState(!shifted.isBefore(timestamp), - "TimestampCombiner moved element from %s to earlier time %s for window %s", + "OutputTimeFn moved element from %s to earlier time %s for window %s", BoundedWindow.formatTimestamp(timestamp), BoundedWindow.formatTimestamp(shifted), window); checkState(timestamp.isAfter(window.maxTimestamp()) || !shifted.isAfter(window.maxTimestamp()), - "TimestampCombiner moved element from %s to %s which is beyond end of " + "OutputTimeFn moved element from %s to %s which is beyond end of " + "window %s", timestamp, shifted, window); @@ -220,7 +217,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { /** * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was * added (ie the element timestamp plus any forward shift requested by the - * {@link WindowingStrategy#getTimestampCombiner}), or {@literal null} if no hold was added. + * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added. * The hold is only added if both: * <ol> * <li>The backend will be able to respect it. In other words the output watermark cannot @@ -453,7 +450,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. * - * <p>The returned timestamp is the output timestamp according to the {@link TimestampCombiner} + * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn} * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late * elements in the current pane. If there is no such value the timestamp is the end * of the window. @@ -465,8 +462,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { + "outputWatermark:{}", context.key(), context.window(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag); - final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG); + final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag); + final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG); return new ReadableState<OldAndNewHolds>() { @Override public ReadableState<OldAndNewHolds> readLater() { http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index 81ac5fa..d0a8923 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -43,10 +43,10 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -149,7 +149,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST); + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW( @@ -200,7 +200,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST); + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); List<WindowedValue<KV<String, Long>>> result = runGABW( @@ -348,7 +348,7 @@ public class GroupAlsoByWindowsProperties { /** * Tests that for a simple sequence of elements on the same key, the given GABW implementation * correctly groups them according to fixed windows and also sets the output timestamp according - * to the policy {@link TimestampCombiner#END_OF_WINDOW}. + * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}. */ public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) @@ -356,7 +356,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW); + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW( @@ -386,7 +386,7 @@ public class GroupAlsoByWindowsProperties { /** * Tests that for a simple sequence of elements on the same key, the given GABW implementation * correctly groups them according to fixed windows and also sets the output timestamp according - * to the policy {@link TimestampCombiner#LATEST}. + * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}. */ public static void groupsElementsIntoFixedWindowsWithLatestTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) @@ -394,7 +394,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.LATEST); + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW( @@ -431,7 +431,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW); + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW( @@ -468,7 +468,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.LATEST); + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); BoundedWindow unmergedWindow = window(15, 25); List<WindowedValue<KV<String, Iterable<String>>>> result = @@ -508,7 +508,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW); + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); BoundedWindow secondWindow = window(15, 25); List<WindowedValue<KV<String, Long>>> result = http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index 6248401..34ddae6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; @@ -71,12 +71,14 @@ public class InMemoryStateInternalsTest { StateTags.set("stringSet", StringUtf8Coder.of()); private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR = StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); - private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey"); @@ -440,7 +442,7 @@ public class InMemoryStateInternalsTest { @Test public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState value = + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); // State instances are cached, but depend on the namespace. @@ -464,7 +466,7 @@ public class InMemoryStateInternalsTest { @Test public void testWatermarkLatestState() throws Exception { - WatermarkHoldState value = + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); // State instances are cached, but depend on the namespace. @@ -488,7 +490,7 @@ public class InMemoryStateInternalsTest { @Test public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); @@ -505,7 +507,7 @@ public class InMemoryStateInternalsTest { @Test public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState value = + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); @@ -519,9 +521,9 @@ public class InMemoryStateInternalsTest { @Test public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = + WatermarkHoldState<BoundedWindow> value1 = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState value2 = + WatermarkHoldState<BoundedWindow> value2 = underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); value1.add(new Instant(3000)); @@ -538,11 +540,11 @@ public class InMemoryStateInternalsTest { @Test public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState value1 = + WatermarkHoldState<BoundedWindow> value1 = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState value2 = + WatermarkHoldState<BoundedWindow> value2 = underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState value3 = + WatermarkHoldState<BoundedWindow> value3 = underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); value1.add(new Instant(3000)); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 44bc538..0d4d992 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -56,12 +56,12 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -210,7 +210,7 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)); @@ -284,7 +284,7 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn) - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever())) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(allowedLateness); @@ -315,7 +315,7 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)); @@ -615,7 +615,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); tester.advanceInputWatermark(new Instant(0)); @@ -668,7 +668,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)); tester.advanceInputWatermark(new Instant(0)); @@ -695,7 +695,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); tester.advanceInputWatermark(new Instant(0)); @@ -724,7 +724,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); tester.advanceInputWatermark(new Instant(0)); @@ -1195,7 +1195,7 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withTrigger( AfterEach.<IntervalWindow>inOrder( Repeatedly.forever( @@ -1251,16 +1251,16 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .withTrigger( - AfterEach.<IntervalWindow>inOrder( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(new Duration(5))) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(new Duration(25))))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTrigger(AfterEach.<IntervalWindow>inOrder( + Repeatedly + .forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf( + new Duration(5))) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf( + new Duration(25))))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index b5b5492..549fd8a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -58,8 +58,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -161,7 +161,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { throws Exception { WindowingStrategy<?, W> strategy = WindowingStrategy.of(windowFn) - .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withMode(mode) .withAllowedLateness(allowedDataLateness) .withClosingBehavior(closingBehavior); @@ -329,10 +329,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows), ImmutableSet.<StateTag<? super String, ?>>of( - TriggerStateMachineRunner.FINISHED_BITS_TAG, - PaneInfoTracker.PANE_INFO_TAG, - WatermarkHold.watermarkHoldTagForTimestampCombiner( - objectStrategy.getTimestampCombiner()), + TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG, + WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()), WatermarkHold.EXTRA_HOLD_TAG)); } @@ -347,8 +345,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { ImmutableSet.copyOf(expectedWindows), ImmutableSet.<StateTag<? super String, ?>>of( PaneInfoTracker.PANE_INFO_TAG, - WatermarkHold.watermarkHoldTagForTimestampCombiner( - objectStrategy.getTimestampCombiner()), + WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()), WatermarkHold.EXTRA_HOLD_TAG)); } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java index 10dcb62..5f5d92d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.CombineFnUtil; import org.junit.Test; import org.junit.runner.RunWith; @@ -97,11 +97,15 @@ public class StateTagTest { @Test public void testWatermarkBagEquality() { - StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST); - - StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST); + StateTag<?, ?> foo1 = StateTags.watermarkStateInternal( + "foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + StateTag<?, ?> foo2 = StateTags.watermarkStateInternal( + "foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + StateTag<?, ?> bar = StateTags.watermarkStateInternal( + "bar", OutputTimeFns.outputAtEarliestInputTimestamp()); + + StateTag<?, ?> bar2 = StateTags.watermarkStateInternal( + "bar", OutputTimeFns.outputAtLatestInputTimestamp()); // Same id, same fn. assertEquals(foo1, foo2); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index 068b37f..0665812 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -213,7 +213,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE; for (State existingState : this.values()) { if (existingState instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState) existingState).read(); + Instant hold = ((WatermarkHoldState<?>) existingState).read(); if (hold != null && hold.isBefore(earliest)) { earliest = hold; } @@ -276,18 +276,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) { return new StateBinder<K>() { @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") - InMemoryState<? extends WatermarkHoldState> existingState = - (InMemoryState<? extends WatermarkHoldState>) + InMemoryState<? extends WatermarkHoldState<W>> existingState = + (InMemoryState<? extends WatermarkHoldState<W>>) underlying.get().get(namespace, address, c); return existingState.copy(); } else { return new InMemoryWatermarkHold<>( - timestampCombiner); + outputTimeFn); } } @@ -419,7 +419,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> State state = readTo.get(namespace, existingState.getKey(), StateContexts.nullContext()); if (state instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState) state).read(); + Instant hold = ((WatermarkHoldState<?>) state).read(); if (hold != null && hold.isBefore(earliestHold)) { earliestHold = hold; } @@ -434,9 +434,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) { return new StateBinder<K>() { @Override - public <W extends BoundedWindow> WatermarkHoldState bindWatermark( - StateTag<? super K, WatermarkHoldState> address, - TimestampCombiner timestampCombiner) { + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { return underlying.get(namespace, address, c); } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 322c995..b08aa8e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -135,14 +135,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT> // to alter the flow of data. This entails: // - trigger as fast as possible // - maintain the full timestamps of elements - // - ensure this GBK holds to the minimum of those timestamps (via TimestampCombiner) + // - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn) // - discard past panes as it is "just a stream" of elements .apply( Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure() .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes() .withAllowedLateness(inputWindowingStrategy.getAllowedLateness()) - .withTimestampCombiner(TimestampCombiner.EARLIEST)) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) // A full GBK to group by key _and_ window .apply("Group by key", GroupByKey.<K, WindowedValue<KV<K, InputT>>>create()) http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index f0aeece..68c6613 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -43,7 +43,8 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; @@ -288,12 +289,13 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST; + OutputTimeFn<BoundedWindow> outputTimeFn = + OutputTimeFns.outputAtEarliestInputTimestamp(); StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, WatermarkHoldState> stateTag = - StateTags.watermarkStateInternal("wmstate", timestampCombiner); - WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag); + StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = + StateTags.watermarkStateInternal("wmstate", outputTimeFn); + WatermarkHoldState<?> underlyingValue = underlying.state(namespace, stateTag); assertThat(underlyingValue.read(), nullValue()); underlyingValue.add(new Instant(250L)); @@ -301,7 +303,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); - WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag); + WatermarkHoldState<BoundedWindow> copyOnAccessState = internals.state(namespace, stateTag); assertThat(copyOnAccessState.read(), equalTo(new Instant(250L))); copyOnAccessState.add(new Instant(100L)); @@ -311,7 +313,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { copyOnAccessState.add(new Instant(500L)); assertThat(copyOnAccessState.read(), equalTo(new Instant(100L))); - WatermarkHoldState reReadUnderlyingValue = + WatermarkHoldState<BoundedWindow> reReadUnderlyingValue = underlying.state(namespace, stateTag); assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); } @@ -512,15 +514,15 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); - StateTag<Object, WatermarkHoldState> firstHoldAddress = - StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - WatermarkHoldState firstHold = + StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState<BoundedWindow> firstHold = internals.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(22L)); - StateTag<Object, WatermarkHoldState> secondHoldAddress = - StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - WatermarkHoldState secondHold = + StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState<BoundedWindow> secondHold = internals.state(StateNamespaces.window(null, second), secondHoldAddress); secondHold.add(new Instant(2L)); @@ -544,18 +546,18 @@ public class CopyOnAccessInMemoryStateInternalsTest { }; CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); - StateTag<Object, WatermarkHoldState> firstHoldAddress = - StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - WatermarkHoldState firstHold = + StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState<BoundedWindow> firstHold = underlying.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(22L)); CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); - StateTag<Object, WatermarkHoldState> secondHoldAddress = - StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - WatermarkHoldState secondHold = + StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState<BoundedWindow> secondHold = internals.state(StateNamespaces.window(null, second), secondHoldAddress); secondHold.add(new Instant(244L)); @@ -581,18 +583,18 @@ public class CopyOnAccessInMemoryStateInternalsTest { }; CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); - StateTag<Object, WatermarkHoldState> firstHoldAddress = - StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - WatermarkHoldState firstHold = + StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState<BoundedWindow> firstHold = underlying.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(224L)); CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); - StateTag<Object, WatermarkHoldState> secondHoldAddress = - StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); - WatermarkHoldState secondHold = + StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress = + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + WatermarkHoldState<BoundedWindow> secondHold = internals.state(StateNamespaces.window(null, second), secondHoldAddress); secondHold.add(new Instant(24L)); @@ -608,7 +610,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { internals .state( StateNamespaces.global(), - StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST)) + StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp())) .add(new Instant(1234L)); thrown.expect(IllegalStateException.class); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java index 7ee2f69..b904bfe 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java @@ -29,8 +29,8 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; @@ -60,8 +60,8 @@ public class HashingFlinkCombineRunner< @SuppressWarnings("unchecked") - TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); - WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); // Flink Iterable can be iterated over only once. List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>(); @@ -87,21 +87,14 @@ public class HashingFlinkCombineRunner< AccumT accumT = flinkCombiner.firstInput(key, currentValue.getValue().getValue(), options, sideInputReader, singletonW); Instant windowTimestamp = - timestampCombiner.assign( - mergedWindow, windowFn.getOutputTime(currentValue.getTimestamp(), mergedWindow)); + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow); accumAndInstant = new Tuple2<>(accumT, windowTimestamp); mapState.put(mergedWindow, accumAndInstant); } else { accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0, currentValue.getValue().getValue(), options, sideInputReader, singletonW); - accumAndInstant.f1 = - timestampCombiner.combine( - accumAndInstant.f1, - timestampCombiner.assign( - mergedWindow, - windowingStrategy - .getWindowFn() - .getOutputTime(currentValue.getTimestamp(), mergedWindow))); + accumAndInstant.f1 = outputTimeFn.combine(accumAndInstant.f1, + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow)); } } if (iterator.hasNext()) {
