Rollforwards "Replace OutputTimeFn UDF with TimestampCombiner enum""
This reverts commit 83d41fcce0c7b123459e5d26ab9938de49f48dab, which reverted commit f38e4271334fced94e8dc1dc97f47b60fa810586 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1395dce Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1395dce Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1395dce Branch: refs/heads/master Commit: d1395dceae3f166bbe2c1a6d32c2fd3e35c839bd Parents: a3e7383 Author: Kenneth Knowles <[email protected]> Authored: Thu Apr 27 08:56:52 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri Apr 28 16:41:43 2017 -0700 ---------------------------------------------------------------------- .../beam/examples/complete/game/GameStats.java | 4 +- .../translation/utils/ApexStateInternals.java | 26 +- .../translation/ApexStateInternalsTest.java | 33 +- .../translation/GroupByKeyTranslatorTest.java | 10 +- .../core/construction/WindowingStrategies.java | 52 ++- .../construction/WindowingStrategiesTest.java | 6 +- .../runners/core/InMemoryStateInternals.java | 32 +- .../beam/runners/core/ReduceFnRunner.java | 4 +- .../beam/runners/core/SplittableParDo.java | 8 +- .../apache/beam/runners/core/StateMerging.java | 32 +- .../org/apache/beam/runners/core/StateTag.java | 11 +- .../org/apache/beam/runners/core/StateTags.java | 16 +- .../core/TestInMemoryStateInternals.java | 2 +- .../apache/beam/runners/core/WatermarkHold.java | 45 +-- .../core/GroupAlsoByWindowsProperties.java | 20 +- .../core/InMemoryStateInternalsTest.java | 34 +- .../beam/runners/core/ReduceFnRunnerTest.java | 38 +-- .../beam/runners/core/ReduceFnTester.java | 13 +- .../apache/beam/runners/core/StateTagTest.java | 16 +- .../CopyOnAccessInMemoryStateInternals.java | 24 +- .../direct/ParDoMultiOverrideFactory.java | 6 +- .../CopyOnAccessInMemoryStateInternalsTest.java | 54 ++-- .../functions/HashingFlinkCombineRunner.java | 19 +- .../functions/SortingFlinkCombineRunner.java | 30 +- .../state/FlinkBroadcastStateInternals.java | 8 +- .../state/FlinkKeyGroupStateInternals.java | 8 +- .../state/FlinkSplitStateInternals.java | 8 +- .../streaming/state/FlinkStateInternals.java | 34 +- .../streaming/FlinkStateInternalsTest.java | 34 +- .../spark/stateful/SparkStateInternals.java | 33 +- .../translation/SparkAbstractCombineFn.java | 4 +- .../spark/translation/SparkGlobalCombineFn.java | 37 ++- .../spark/translation/SparkKeyedCombineFn.java | 37 ++- sdks/java/core/pom.xml | 5 - .../beam/sdk/testing/WindowFnTestUtils.java | 53 +++- .../apache/beam/sdk/transforms/GroupByKey.java | 3 +- .../sdk/transforms/windowing/OutputTimeFn.java | 314 ------------------- .../sdk/transforms/windowing/OutputTimeFns.java | 212 ------------- .../transforms/windowing/TimestampCombiner.java | 186 +++++++++++ .../beam/sdk/transforms/windowing/Window.java | 22 +- .../org/apache/beam/sdk/util/Reshuffle.java | 7 +- .../apache/beam/sdk/util/WindowingStrategy.java | 176 +++-------- .../apache/beam/sdk/util/state/StateBinder.java | 12 +- .../apache/beam/sdk/util/state/StateSpecs.java | 23 +- .../beam/sdk/util/state/WatermarkHoldState.java | 19 +- .../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 - .../beam/sdk/transforms/GroupByKeyTest.java | 10 +- .../sdk/transforms/join/CoGroupByKeyTest.java | 6 +- .../transforms/windowing/OutputTimeFnsTest.java | 51 --- .../sdk/transforms/windowing/SessionsTest.java | 6 +- .../sdk/transforms/windowing/WindowTest.java | 23 +- .../sdk/transforms/windowing/WindowingTest.java | 2 +- .../extensions/gcp/GcpCoreApiSurfaceTest.java | 1 - 53 files changed, 740 insertions(+), 1130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index b6c05be..e0048b7 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -43,8 +43,8 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -313,7 +313,7 @@ public class GameStats extends LeaderBoard { userEvents .apply("WindowIntoSessions", Window.<KV<String, Integer>>into( Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap()))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) + .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) // For this use, we care only about the existence of the session, not any particular // information aggregated over it, so the following is an efficient way to do that. .apply(Combine.perKey(x -> 0)) http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java index cfc57cd..ec8f666 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.BagState; @@ -150,10 +150,10 @@ public class ApexStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn); + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { + return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner); } @Override @@ -269,16 +269,16 @@ public class ApexStateInternals<K> implements StateInternals<K> { } private final class ApexWatermarkHoldState<W extends BoundedWindow> - extends AbstractState<Instant> implements WatermarkHoldState<W> { + extends AbstractState<Instant> implements WatermarkHoldState { - private final OutputTimeFn<? super W> outputTimeFn; + private final TimestampCombiner timestampCombiner; public ApexWatermarkHoldState( StateNamespace namespace, - StateTag<?, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { + StateTag<?, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { super(namespace, address, InstantCoder.of()); - this.outputTimeFn = outputTimeFn; + this.timestampCombiner = timestampCombiner; } @Override @@ -294,7 +294,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { @Override public void add(Instant outputTime) { Instant combined = read(); - combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime); + combined = (combined == null) ? outputTime : timestampCombiner.combine(combined, outputTime); writeValue(combined); } @@ -313,8 +313,8 @@ public class ApexStateInternals<K> implements StateInternals<K> { } @Override - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; + public TimestampCombiner getTimestampCombiner() { + return timestampCombiner; } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java index 4021c62..091fe3b 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexStateInternalsTest.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; @@ -66,14 +66,13 @@ public class ApexStateInternalsTest { "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> - WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); private ApexStateInternals<String> underTest; @@ -228,7 +227,7 @@ public class ApexStateInternalsTest { @Test public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); // State instances are cached, but depend on the namespace. @@ -252,7 +251,7 @@ public class ApexStateInternalsTest { @Test public void testWatermarkLatestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); // State instances are cached, but depend on the namespace. @@ -276,7 +275,7 @@ public class ApexStateInternalsTest { @Test public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); @@ -293,7 +292,7 @@ public class ApexStateInternalsTest { @Test public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState<BoundedWindow> value = + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); @@ -307,9 +306,9 @@ public class ApexStateInternalsTest { @Test public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = + WatermarkHoldState value1 = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = + WatermarkHoldState value2 = underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); value1.add(new Instant(3000)); @@ -326,11 +325,11 @@ public class ApexStateInternalsTest { @Test public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = + WatermarkHoldState value1 = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = + WatermarkHoldState value2 = underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value3 = + WatermarkHoldState value3 = underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); value1.add(new Instant(3000)); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java index 193de71..9c61b47 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java @@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.joda.time.Duration; @@ -83,12 +83,12 @@ public class GroupByKeyTranslatorTest { ); p.apply(Read.from(new TestSource(data, new Instant(5000)))) - .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))) - .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())) + .apply( + Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))) + .withTimestampCombiner(TimestampCombiner.LATEST)) .apply(Count.<String>perElement()) .apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>())) - .apply(ParDo.of(new EmbeddedCollector())) - ; + .apply(ParDo.of(new EmbeddedCollector())); ApexRunnerResult result = (ApexRunnerResult) p.run(); result.getApexDAG(); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java index 3d7deef..0c400db 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java @@ -28,16 +28,15 @@ import java.io.Serializable; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes; import org.joda.time.Duration; /** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */ @@ -115,11 +114,42 @@ public class WindowingStrategies implements Serializable { } } - public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) { - if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) { - return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn()); - } else { - return OutputTimeFns.toProto(outputTimeFn); + public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) { + switch(timestampCombiner) { + case EARLIEST: + return OutputTime.EARLIEST_IN_PANE; + case END_OF_WINDOW: + return OutputTime.END_OF_WINDOW; + case LATEST: + return OutputTime.LATEST_IN_PANE; + default: + throw new IllegalArgumentException( + String.format( + "Unknown %s: %s", + TimestampCombiner.class.getSimpleName(), + timestampCombiner)); + } + } + + public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) { + switch (proto) { + case EARLIEST_IN_PANE: + return TimestampCombiner.EARLIEST; + case END_OF_WINDOW: + return TimestampCombiner.END_OF_WINDOW; + case LATEST_IN_PANE: + return TimestampCombiner.LATEST; + case UNRECOGNIZED: + default: + // Whether or not it is proto that cannot recognize it (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this OutputTime means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.OutputTime.class.getCanonicalName(), + OutputTime.class.getCanonicalName(), + proto)); } } @@ -177,7 +207,7 @@ public class WindowingStrategies implements Serializable { RunnerApi.WindowingStrategy.Builder windowingStrategyProto = RunnerApi.WindowingStrategy.newBuilder() - .setOutputTime(toProto(windowingStrategy.getOutputTimeFn())) + .setOutputTime(toProto(windowingStrategy.getTimestampCombiner())) .setAccumulationMode(toProto(windowingStrategy.getMode())) .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior())) .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) @@ -229,7 +259,7 @@ public class WindowingStrategies implements Serializable { "WindowFn"); WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn; - OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime()); + TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime()); AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode()); Trigger trigger = Triggers.fromProto(proto.getTrigger()); ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); @@ -239,7 +269,7 @@ public class WindowingStrategies implements Serializable { .withAllowedLateness(allowedLateness) .withMode(accumulationMode) .withTrigger(trigger) - .withOutputTimeFn(outputTimeFn) + .withTimestampCombiner(timestampCombiner) .withClosingBehavior(closingBehavior); } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java index 62bba8e..78ac61c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -68,14 +68,14 @@ public class WindowingStrategiesTest { .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withTrigger(REPRESENTATIVE_TRIGGER) .withAllowedLateness(Duration.millis(71)) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())), + .withTimestampCombiner(TimestampCombiner.EARLIEST)), toProtoAndBackSpec( WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN) .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withTrigger(REPRESENTATIVE_TRIGGER) .withAllowedLateness(Duration.millis(93)) - .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))); + .withTimestampCombiner(TimestampCombiner.LATEST))); } @Parameter(0) http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 55b7fc2..9fb8e3f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -156,10 +156,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - return new InMemoryWatermarkHold<W>(outputTimeFn); + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { + return new InMemoryWatermarkHold<W>(timestampCombiner); } @Override @@ -233,19 +233,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { * An {@link InMemoryState} implementation of {@link WatermarkHoldState}. */ public static final class InMemoryWatermarkHold<W extends BoundedWindow> - implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> { + implements WatermarkHoldState, InMemoryState<InMemoryWatermarkHold<W>> { - private final OutputTimeFn<? super W> outputTimeFn; + private final TimestampCombiner timestampCombiner; @Nullable private Instant combinedHold = null; - public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) { - this.outputTimeFn = outputTimeFn; + public InMemoryWatermarkHold(TimestampCombiner timestampCombiner) { + this.timestampCombiner = timestampCombiner; } @Override - public InMemoryWatermarkHold<W> readLater() { + public InMemoryWatermarkHold readLater() { return this; } @@ -263,8 +263,10 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { @Override public void add(Instant outputTime) { - combinedHold = combinedHold == null ? outputTime - : outputTimeFn.combine(combinedHold, outputTime); + combinedHold = + combinedHold == null + ? outputTime + : timestampCombiner.combine(combinedHold, outputTime); } @Override @@ -287,8 +289,8 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; + public TimestampCombiner getTimestampCombiner() { + return timestampCombiner; } @Override @@ -299,7 +301,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { @Override public InMemoryWatermarkHold<W> copy() { InMemoryWatermarkHold<W> that = - new InMemoryWatermarkHold<>(outputTimeFn); + new InMemoryWatermarkHold<>(timestampCombiner); that.combinedHold = this.combinedHold; return that; } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index c9f6bba..0be7c95 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -47,9 +47,9 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; @@ -171,7 +171,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { * <ul> * <li>State: Bag of hold timestamps. * <li>State style: RENAMED - * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging. + * <li>Merging: Depending on {@link TimestampCombiner}, may need to be recalculated on merging. * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection * hold. * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected. http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 31d89ee..5273e86 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -45,7 +45,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -355,10 +355,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> * the input watermark when the first {@link DoFn.ProcessElement} call for this element * completes. */ - private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag = + private static final StateTag<Object, WatermarkHoldState> watermarkHoldTag = StateTags.makeSystemTagInternal( StateTags.<GlobalWindow>watermarkStateInternal( - "hold", OutputTimeFns.outputAtLatestInputTimestamp())); + "hold", TimestampCombiner.LATEST)); /** * The state cell containing a copy of the element. Written during the first {@link @@ -480,7 +480,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> stateInternals.state(stateNamespace, elementTag); ValueState<RestrictionT> restrictionState = stateInternals.state(stateNamespace, restrictionTag); - WatermarkHoldState<GlobalWindow> holdState = + WatermarkHoldState holdState = stateInternals.state(stateNamespace, watermarkHoldTag); ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction; http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java index 3410850..ce37fd3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java @@ -218,24 +218,24 @@ public class StateMerging { */ public static <K, W extends BoundedWindow> void prefetchWatermarks( MergingStateAccessor<K, W> context, - StateTag<? super K, WatermarkHoldState<W>> address) { - Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address); - WatermarkHoldState<W> result = context.access(address); + StateTag<? super K, WatermarkHoldState> address) { + Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address); + WatermarkHoldState result = context.access(address); if (map.isEmpty()) { // Nothing to prefetch. return; } if (map.size() == 1 && map.values().contains(result) - && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) { + && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) { // Nothing to change. return; } - if (result.getOutputTimeFn().dependsOnlyOnWindow()) { + if (result.getTimestampCombiner().dependsOnlyOnWindow()) { // No need to read existing holds. return; } // Prefetch. - for (WatermarkHoldState<W> source : map.values()) { + for (WatermarkHoldState source : map.values()) { prefetchRead(source); } } @@ -250,7 +250,7 @@ public class StateMerging { */ public static <K, W extends BoundedWindow> void mergeWatermarks( MergingStateAccessor<K, W> context, - StateTag<? super K, WatermarkHoldState<W>> address, + StateTag<? super K, WatermarkHoldState> address, W mergeResult) { mergeWatermarks( context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult); @@ -261,31 +261,31 @@ public class StateMerging { * into {@code result}, where the final merge result window is {@code mergeResult}. */ public static <W extends BoundedWindow> void mergeWatermarks( - Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result, + Collection<WatermarkHoldState> sources, WatermarkHoldState result, W resultWindow) { if (sources.isEmpty()) { // Nothing to merge. return; } if (sources.size() == 1 && sources.contains(result) - && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) { + && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) { // Nothing to merge. return; } - if (result.getOutputTimeFn().dependsOnlyOnWindow()) { + if (result.getTimestampCombiner().dependsOnlyOnWindow()) { // Clear sources. - for (WatermarkHoldState<W> source : sources) { + for (WatermarkHoldState source : sources) { source.clear(); } // Update directly from window-derived hold. - Instant hold = result.getOutputTimeFn().assignOutputTime( - BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow); + Instant hold = + result.getTimestampCombiner().assign(resultWindow, BoundedWindow.TIMESTAMP_MIN_VALUE); checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE)); result.add(hold); } else { // Prefetch. List<ReadableState<Instant>> futures = new ArrayList<>(sources.size()); - for (WatermarkHoldState<W> source : sources) { + for (WatermarkHoldState source : sources) { futures.add(source); } // Read. @@ -297,12 +297,12 @@ public class StateMerging { } } // Clear sources. - for (WatermarkHoldState<W> source : sources) { + for (WatermarkHoldState source : sources) { source.clear(); } if (!outputTimesToMerge.isEmpty()) { // Merge and update. - result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge)); + result.add(result.getTimestampCombiner().merge(resultWindow, outputTimesToMerge)); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java index 12c59ad..a5d262a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; @@ -115,11 +115,10 @@ public interface StateTag<K, StateT extends State> extends Serializable { /** * Bind to a watermark {@link StateSpec}. * - * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to - * the returned {@link WatermarkHoldState} are to be combined. + * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps + * added to the returned {@link WatermarkHoldState} are to be combined. */ - <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> spec, - OutputTimeFn<? super W> outputTimeFn); + <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> spec, TimestampCombiner timestampCombiner); } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index 3a45569..2b3f4b8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -29,7 +29,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; @@ -110,11 +110,11 @@ public class StateTags { } @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( String id, - StateSpec<? super K, WatermarkHoldState<W>> spec, - OutputTimeFn<? super W> outputTimeFn) { - return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn); + StateSpec<? super K, WatermarkHoldState> spec, + TimestampCombiner timestampCombiner) { + return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner); } }; } @@ -228,10 +228,10 @@ public class StateTags { /** * Create a state tag for holding the watermark. */ - public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>> - watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) { + public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState> + watermarkStateInternal(String id, TimestampCombiner timestampCombiner) { return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn)); + new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java index 0321a33..1dfb85f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java @@ -52,7 +52,7 @@ public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> { Instant minimum = null; for (State storage : inMemoryState.values()) { if (storage instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState<?>) storage).read(); + Instant hold = ((WatermarkHoldState) storage).read(); if (minimum == null || (hold != null && hold.isBefore(minimum))) { minimum = hold; } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index d3c4bc7..9bb9c62 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -23,9 +23,8 @@ import com.google.common.annotations.VisibleForTesting; import java.io.Serializable; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowingStrategy; @@ -55,37 +54,38 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { * used for elements. */ public static <W extends BoundedWindow> - StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn( - OutputTimeFn<? super W> outputTimeFn) { - return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal( - StateTags.<W>watermarkStateInternal("hold", outputTimeFn)); + StateTag<Object, WatermarkHoldState> watermarkHoldTagForTimestampCombiner( + TimestampCombiner timestampCombiner) { + return StateTags.<Object, WatermarkHoldState>makeSystemTagInternal( + StateTags.<W>watermarkStateInternal("hold", timestampCombiner)); } /** * Tag for state containing end-of-window and garbage collection output watermark holds. - * (We can't piggy-back on the data hold state since the outputTimeFn may be - * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will + * (We can't piggy-back on the data hold state since the timestampCombiner may be + * {@link TimestampCombiner#EARLIEST}, in which case every pane will * would take the end-of-window time as its element time.) */ @VisibleForTesting - public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG = + public static final StateTag<Object, WatermarkHoldState> EXTRA_HOLD_TAG = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal( - "extra", OutputTimeFns.outputAtEarliestInputTimestamp())); + "extra", TimestampCombiner.EARLIEST)); private final TimerInternals timerInternals; private final WindowingStrategy<?, W> windowingStrategy; - private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag; + private final StateTag<Object, WatermarkHoldState> elementHoldTag; public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) { this.timerInternals = timerInternals; this.windowingStrategy = windowingStrategy; - this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn()); + this.elementHoldTag = + watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner()); } /** * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp - * of the element in {@code context}. We allow the actual hold time to be shifted later by - * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will + * of the element in {@code context}. We allow the actual hold time to be shifted later by the + * {@link TimestampCombiner}, but no further than the end of the window. The hold will * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold * was placed, or {@literal null} if no hold was placed. * @@ -199,15 +199,18 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { * strategy's output time function. */ private Instant shift(Instant timestamp, W window) { - Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window); + Instant shifted = + windowingStrategy + .getTimestampCombiner() + .assign(window, windowingStrategy.getWindowFn().getOutputTime(timestamp, window)); checkState(!shifted.isBefore(timestamp), - "OutputTimeFn moved element from %s to earlier time %s for window %s", + "TimestampCombiner moved element from %s to earlier time %s for window %s", BoundedWindow.formatTimestamp(timestamp), BoundedWindow.formatTimestamp(shifted), window); checkState(timestamp.isAfter(window.maxTimestamp()) || !shifted.isAfter(window.maxTimestamp()), - "OutputTimeFn moved element from %s to %s which is beyond end of " + "TimestampCombiner moved element from %s to %s which is beyond end of " + "window %s", timestamp, shifted, window); @@ -217,7 +220,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { /** * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was * added (ie the element timestamp plus any forward shift requested by the - * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added. + * {@link WindowingStrategy#getTimestampCombiner}), or {@literal null} if no hold was added. * The hold is only added if both: * <ol> * <li>The backend will be able to respect it. In other words the output watermark cannot @@ -450,7 +453,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. * - * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn} + * <p>The returned timestamp is the output timestamp according to the {@link TimestampCombiner} * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late * elements in the current pane. If there is no such value the timestamp is the end * of the window. @@ -462,8 +465,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { + "outputWatermark:{}", context.key(), context.window(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag); - final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG); + final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag); + final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG); return new ReadableState<OldAndNewHolds>() { @Override public ReadableState<OldAndNewHolds> readLater() { http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index d0a8923..81ac5fa 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -43,10 +43,10 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -149,7 +149,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); + .withTimestampCombiner(TimestampCombiner.EARLIEST); List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW( @@ -200,7 +200,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); + .withTimestampCombiner(TimestampCombiner.EARLIEST); List<WindowedValue<KV<String, Long>>> result = runGABW( @@ -348,7 +348,7 @@ public class GroupAlsoByWindowsProperties { /** * Tests that for a simple sequence of elements on the same key, the given GABW implementation * correctly groups them according to fixed windows and also sets the output timestamp according - * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}. + * to the policy {@link TimestampCombiner#END_OF_WINDOW}. */ public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) @@ -356,7 +356,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); + .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW); List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW( @@ -386,7 +386,7 @@ public class GroupAlsoByWindowsProperties { /** * Tests that for a simple sequence of elements on the same key, the given GABW implementation * correctly groups them according to fixed windows and also sets the output timestamp according - * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}. + * to the policy {@link TimestampCombiner#LATEST}. */ public static void groupsElementsIntoFixedWindowsWithLatestTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) @@ -394,7 +394,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); + .withTimestampCombiner(TimestampCombiner.LATEST); List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW( @@ -431,7 +431,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); + .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW); List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW( @@ -468,7 +468,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); + .withTimestampCombiner(TimestampCombiner.LATEST); BoundedWindow unmergedWindow = window(15, 25); List<WindowedValue<KV<String, Iterable<String>>>> result = @@ -508,7 +508,7 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); + .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW); BoundedWindow secondWindow = window(15, 25); List<WindowedValue<KV<String, Long>>> result = http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index 34ddae6..6248401 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; @@ -71,14 +71,12 @@ public class InMemoryStateInternalsTest { StateTags.set("stringSet", StringUtf8Coder.of()); private static final StateTag<Object, MapState<String, Integer>> STRING_MAP_ADDR = StateTags.map("stringMap", StringUtf8Coder.of(), VarIntCoder.of()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> - WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> - WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST); + private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW); InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey"); @@ -442,7 +440,7 @@ public class InMemoryStateInternalsTest { @Test public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); // State instances are cached, but depend on the namespace. @@ -466,7 +464,7 @@ public class InMemoryStateInternalsTest { @Test public void testWatermarkLatestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); // State instances are cached, but depend on the namespace. @@ -490,7 +488,7 @@ public class InMemoryStateInternalsTest { @Test public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); @@ -507,7 +505,7 @@ public class InMemoryStateInternalsTest { @Test public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState<BoundedWindow> value = + WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); @@ -521,9 +519,9 @@ public class InMemoryStateInternalsTest { @Test public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = + WatermarkHoldState value1 = underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = + WatermarkHoldState value2 = underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); value1.add(new Instant(3000)); @@ -540,11 +538,11 @@ public class InMemoryStateInternalsTest { @Test public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = + WatermarkHoldState value1 = underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = + WatermarkHoldState value2 = underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value3 = + WatermarkHoldState value3 = underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); value1.add(new Instant(3000)); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 0d4d992..44bc538 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -56,12 +56,12 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Never; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -210,7 +210,7 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)); @@ -284,7 +284,7 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever())) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(allowedLateness); @@ -315,7 +315,7 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)); @@ -615,7 +615,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); tester.advanceInputWatermark(new Instant(0)); @@ -668,7 +668,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)); tester.advanceInputWatermark(new Instant(0)); @@ -695,7 +695,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); tester.advanceInputWatermark(new Instant(0)); @@ -724,7 +724,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); tester.advanceInputWatermark(new Instant(0)); @@ -1195,7 +1195,7 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withTrigger( AfterEach.<IntervalWindow>inOrder( Repeatedly.forever( @@ -1251,16 +1251,16 @@ public class ReduceFnRunnerTest { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) - .withTrigger(AfterEach.<IntervalWindow>inOrder( - Repeatedly - .forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf( - new Duration(5))) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf( - new Duration(25))))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withTrigger( + AfterEach.<IntervalWindow>inOrder( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(new Duration(5))) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(new Duration(25))))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 549fd8a..b5b5492 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -58,8 +58,8 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -161,7 +161,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { throws Exception { WindowingStrategy<?, W> strategy = WindowingStrategy.of(windowFn) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTimestampCombiner(TimestampCombiner.EARLIEST) .withMode(mode) .withAllowedLateness(allowedDataLateness) .withClosingBehavior(closingBehavior); @@ -329,8 +329,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows), ImmutableSet.<StateTag<? super String, ?>>of( - TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG, - WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()), + TriggerStateMachineRunner.FINISHED_BITS_TAG, + PaneInfoTracker.PANE_INFO_TAG, + WatermarkHold.watermarkHoldTagForTimestampCombiner( + objectStrategy.getTimestampCombiner()), WatermarkHold.EXTRA_HOLD_TAG)); } @@ -345,7 +347,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { ImmutableSet.copyOf(expectedWindows), ImmutableSet.<StateTag<? super String, ?>>of( PaneInfoTracker.PANE_INFO_TAG, - WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()), + WatermarkHold.watermarkHoldTagForTimestampCombiner( + objectStrategy.getTimestampCombiner()), WatermarkHold.EXTRA_HOLD_TAG)); } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java index 5f5d92d..10dcb62 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineFnUtil; import org.junit.Test; import org.junit.runner.RunWith; @@ -97,15 +97,11 @@ public class StateTagTest { @Test public void testWatermarkBagEquality() { - StateTag<?, ?> foo1 = StateTags.watermarkStateInternal( - "foo", OutputTimeFns.outputAtEarliestInputTimestamp()); - StateTag<?, ?> foo2 = StateTags.watermarkStateInternal( - "foo", OutputTimeFns.outputAtEarliestInputTimestamp()); - StateTag<?, ?> bar = StateTags.watermarkStateInternal( - "bar", OutputTimeFns.outputAtEarliestInputTimestamp()); - - StateTag<?, ?> bar2 = StateTags.watermarkStateInternal( - "bar", OutputTimeFns.outputAtLatestInputTimestamp()); + StateTag<?, ?> foo1 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + StateTag<?, ?> foo2 = StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + StateTag<?, ?> bar = StateTags.watermarkStateInternal("bar", TimestampCombiner.EARLIEST); + + StateTag<?, ?> bar2 = StateTags.watermarkStateInternal("bar", TimestampCombiner.LATEST); // Same id, same fn. assertEquals(foo1, foo2); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index 0665812..068b37f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -43,7 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; @@ -213,7 +213,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE; for (State existingState : this.values()) { if (existingState instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState<?>) existingState).read(); + Instant hold = ((WatermarkHoldState) existingState).read(); if (hold != null && hold.isBefore(earliest)) { earliest = hold; } @@ -276,18 +276,18 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) { return new StateBinder<K>() { @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") - InMemoryState<? extends WatermarkHoldState<W>> existingState = - (InMemoryState<? extends WatermarkHoldState<W>>) + InMemoryState<? extends WatermarkHoldState> existingState = + (InMemoryState<? extends WatermarkHoldState>) underlying.get().get(namespace, address, c); return existingState.copy(); } else { return new InMemoryWatermarkHold<>( - outputTimeFn); + timestampCombiner); } } @@ -419,7 +419,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> State state = readTo.get(namespace, existingState.getKey(), StateContexts.nullContext()); if (state instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState<?>) state).read(); + Instant hold = ((WatermarkHoldState) state).read(); if (hold != null && hold.isBefore(earliestHold)) { earliestHold = hold; } @@ -434,9 +434,9 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> public StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) { return new StateBinder<K>() { @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { + public <W extends BoundedWindow> WatermarkHoldState bindWatermark( + StateTag<? super K, WatermarkHoldState> address, + TimestampCombiner timestampCombiner) { return underlying.get(namespace, address, c); } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index b08aa8e..322c995 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -40,8 +40,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -135,14 +135,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT> // to alter the flow of data. This entails: // - trigger as fast as possible // - maintain the full timestamps of elements - // - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn) + // - ensure this GBK holds to the minimum of those timestamps (via TimestampCombiner) // - discard past panes as it is "just a stream" of elements .apply( Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure() .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes() .withAllowedLateness(inputWindowingStrategy.getAllowedLateness()) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) + .withTimestampCombiner(TimestampCombiner.EARLIEST)) // A full GBK to group by key _and_ window .apply("Group by key", GroupByKey.<K, WindowedValue<KV<K, InputT>>>create()) http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index 68c6613..f0aeece 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -43,8 +43,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.GroupingState; @@ -289,13 +288,12 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - OutputTimeFn<BoundedWindow> outputTimeFn = - OutputTimeFns.outputAtEarliestInputTimestamp(); + TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST; StateNamespace namespace = new StateNamespaceForTest("foo"); - StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = - StateTags.watermarkStateInternal("wmstate", outputTimeFn); - WatermarkHoldState<?> underlyingValue = underlying.state(namespace, stateTag); + StateTag<Object, WatermarkHoldState> stateTag = + StateTags.watermarkStateInternal("wmstate", timestampCombiner); + WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag); assertThat(underlyingValue.read(), nullValue()); underlyingValue.add(new Instant(250L)); @@ -303,7 +301,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); - WatermarkHoldState<BoundedWindow> copyOnAccessState = internals.state(namespace, stateTag); + WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag); assertThat(copyOnAccessState.read(), equalTo(new Instant(250L))); copyOnAccessState.add(new Instant(100L)); @@ -313,7 +311,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { copyOnAccessState.add(new Instant(500L)); assertThat(copyOnAccessState.read(), equalTo(new Instant(100L))); - WatermarkHoldState<BoundedWindow> reReadUnderlyingValue = + WatermarkHoldState reReadUnderlyingValue = underlying.state(namespace, stateTag); assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); } @@ -514,15 +512,15 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); - StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress = - StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); - WatermarkHoldState<BoundedWindow> firstHold = + StateTag<Object, WatermarkHoldState> firstHoldAddress = + StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + WatermarkHoldState firstHold = internals.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(22L)); - StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress = - StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); - WatermarkHoldState<BoundedWindow> secondHold = + StateTag<Object, WatermarkHoldState> secondHoldAddress = + StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + WatermarkHoldState secondHold = internals.state(StateNamespaces.window(null, second), secondHoldAddress); secondHold.add(new Instant(2L)); @@ -546,18 +544,18 @@ public class CopyOnAccessInMemoryStateInternalsTest { }; CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); - StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress = - StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); - WatermarkHoldState<BoundedWindow> firstHold = + StateTag<Object, WatermarkHoldState> firstHoldAddress = + StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + WatermarkHoldState firstHold = underlying.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(22L)); CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); - StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress = - StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); - WatermarkHoldState<BoundedWindow> secondHold = + StateTag<Object, WatermarkHoldState> secondHoldAddress = + StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + WatermarkHoldState secondHold = internals.state(StateNamespaces.window(null, second), secondHoldAddress); secondHold.add(new Instant(244L)); @@ -583,18 +581,18 @@ public class CopyOnAccessInMemoryStateInternalsTest { }; CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null); - StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress = - StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); - WatermarkHoldState<BoundedWindow> firstHold = + StateTag<Object, WatermarkHoldState> firstHoldAddress = + StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + WatermarkHoldState firstHold = underlying.state(StateNamespaces.window(null, first), firstHoldAddress); firstHold.add(new Instant(224L)); CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit()); - StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress = - StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()); - WatermarkHoldState<BoundedWindow> secondHold = + StateTag<Object, WatermarkHoldState> secondHoldAddress = + StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST); + WatermarkHoldState secondHold = internals.state(StateNamespaces.window(null, second), secondHoldAddress); secondHold.add(new Instant(24L)); @@ -610,7 +608,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { internals .state( StateNamespaces.global(), - StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp())) + StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST)) .add(new Instant(1234L)); thrown.expect(IllegalStateException.class); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java index b904bfe..7ee2f69 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java @@ -29,8 +29,8 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; @@ -60,8 +60,8 @@ public class HashingFlinkCombineRunner< @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner(); + WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn(); // Flink Iterable can be iterated over only once. List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>(); @@ -87,14 +87,21 @@ public class HashingFlinkCombineRunner< AccumT accumT = flinkCombiner.firstInput(key, currentValue.getValue().getValue(), options, sideInputReader, singletonW); Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow); + timestampCombiner.assign( + mergedWindow, windowFn.getOutputTime(currentValue.getTimestamp(), mergedWindow)); accumAndInstant = new Tuple2<>(accumT, windowTimestamp); mapState.put(mergedWindow, accumAndInstant); } else { accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0, currentValue.getValue().getValue(), options, sideInputReader, singletonW); - accumAndInstant.f1 = outputTimeFn.combine(accumAndInstant.f1, - outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow)); + accumAndInstant.f1 = + timestampCombiner.combine( + accumAndInstant.f1, + timestampCombiner.assign( + mergedWindow, + windowingStrategy + .getWindowFn() + .getOutputTime(currentValue.getTimestamp(), mergedWindow))); } } if (iterator.hasNext()) {
