Repository: incubator-beam Updated Branches: refs/heads/master 69ec223e5 -> 34e05954d
Use CommittedResult in InMemoryWatermarkManager This enable unprocessed elements to be handled in the Watermark manager after they are added to the CommittedResult structure. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dff82cae Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dff82cae Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dff82cae Branch: refs/heads/master Commit: dff82cae2f9d6393e4bdbb7fd527f58eb2cdaa01 Parents: 659cf2e Author: Thomas Groh <[email protected]> Authored: Thu Apr 28 13:42:36 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon May 2 12:54:02 2016 -0700 ---------------------------------------------------------------------- .../direct/InMemoryWatermarkManager.java | 19 +- .../direct/InProcessEvaluationContext.java | 6 +- .../direct/InMemoryWatermarkManagerTest.java | 368 ++++++++++++------- 3 files changed, 252 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dff82cae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java index 769457a..4d5a3a1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java @@ -800,18 +800,19 @@ public class InMemoryWatermarkManager { * </pre>. * * @param completed the input that has completed - * @param transform the transform that has completed processing the input - * @param outputs the bundles the transform has output + * @param timerUpdate the timers that were added, removed, and completed as part of producing + * this update + * @param result the result that was produced by processing the input * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there * is no hold */ public void updateWatermarks( @Nullable CommittedBundle<?> completed, - AppliedPTransform<?, ?, ?> transform, TimerUpdate timerUpdate, - Iterable<? extends CommittedBundle<?>> outputs, + CommittedResult result, @Nullable Instant earliestHold) { - updatePending(completed, transform, timerUpdate, outputs); + AppliedPTransform<?, ?, ?> transform = result.getTransform(); + updatePending(completed, timerUpdate, result); TransformWatermarks transformWms = transformToWatermarks.get(transform); transformWms.setEventTimeHold(completed == null ? null : completed.getKey(), earliestHold); refreshWatermarks(transform); @@ -846,15 +847,14 @@ public class InMemoryWatermarkManager { */ private void updatePending( CommittedBundle<?> input, - AppliedPTransform<?, ?, ?> transform, TimerUpdate timerUpdate, - Iterable<? extends CommittedBundle<?>> outputs) { - TransformWatermarks completedTransform = transformToWatermarks.get(transform); + CommittedResult result) { + TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform()); // Newly pending elements must be added before completed elements are removed, as the two // do not share a Mutex within this call and thus can be interleaved with external calls to // refresh. - for (CommittedBundle<?> bundle : outputs) { + for (CommittedBundle<?> bundle : result.getOutputs()) { for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) { TransformWatermarks watermarks = transformToWatermarks.get(consumer); watermarks.addPending(bundle); @@ -865,7 +865,6 @@ public class InMemoryWatermarkManager { if (input != null) { completedTransform.removePending(input); } - } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dff82cae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java index d9a7ff0..d4f891e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java @@ -152,11 +152,11 @@ class InProcessEvaluationContext { Iterable<? extends CommittedBundle<?>> committedBundles = commitBundles(result.getOutputBundles()); // Update watermarks and timers + CommittedResult committedResult = CommittedResult.create(result, committedBundles); watermarkManager.updateWatermarks( completedBundle, - result.getTransform(), result.getTimerUpdate().withCompletedTimers(completedTimers), - committedBundles, + committedResult, result.getWatermarkHold()); fireAllAvailableCallbacks(); // Update counters @@ -176,7 +176,7 @@ class InProcessEvaluationContext { applicationStateInternals.remove(stepAndKey); } } - return CommittedResult.create(result, committedBundles); + return committedResult; } private Iterable<? extends CommittedBundle<?>> commitBundles( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dff82cae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java index 2880ade..15cdf8a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java @@ -159,8 +159,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void getWatermarkForUpdatedSourceTransform() { CommittedBundle<Integer> output = multiWindowedBundle(createdInts, 1); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(output), new Instant(8000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(output)), + new Instant(8000L)); TransformWatermarks updatedSourceWatermark = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -175,8 +178,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { public void getWatermarkForMultiInputTransform() { CommittedBundle<Integer> secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1); - manager.updateWatermarks(null, intsToFlatten.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(intsToFlatten.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); // We didn't do anything for the first source, so we shouldn't have progressed the watermark @@ -205,13 +210,17 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> flattenedBundleSecondCreate = multiWindowedBundle(flattened, -1); // We have finished processing the bundle from the second PCollection, but we haven't consumed // anything from the first PCollection yet; so our watermark shouldn't advance - manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate), + manager.updateWatermarks(secondPcollectionBundle, + TimerUpdate.empty(), + result(flattened.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)), null); TransformWatermarks transformAfterProcessing = manager.getWatermarks(flattened.getProducingTransformInternal()); - manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate), + manager.updateWatermarks(secondPcollectionBundle, + TimerUpdate.empty(), + result(flattened.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)), null); assertThat( transformAfterProcessing.getInputWatermark(), @@ -225,8 +234,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { timestampedBundle(createdInts, TimestampedValue.<Integer>of(5, firstCollectionTimestamp)); // the source is done, but elements are still buffered. The source output watermark should be // past the end of the global window - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)), new Instant(Long.MAX_VALUE)); TransformWatermarks firstSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -253,8 +264,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<?> completedFlattenBundle = bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); - manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(completedFlattenBundle), + manager.updateWatermarks(firstPcollectionBundle, + TimerUpdate.empty(), + result(flattened.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)), null); TransformWatermarks afterConsumingAllInput = manager.getWatermarks(flattened.getProducingTransformInternal()); @@ -275,8 +288,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts, TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(createdBundle)), + new Instant(Long.MAX_VALUE)); TransformWatermarks createdAfterProducing = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat( @@ -287,8 +303,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); - manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), null); + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(keyBundle)), + null); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat( @@ -303,8 +322,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> filteredBundle = timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L))); - manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filteredBundle), null); + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(filteredBundle)), + null); TransformWatermarks filteredProcessedWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); assertThat( @@ -322,17 +344,23 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void updateWatermarkWithWatermarkHolds() { CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts, - TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), + TimestampedValue.of(1, new Instant(1_000_000L)), + TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(createdBundle)), + new Instant(Long.MAX_VALUE)); - CommittedBundle<KV<String, Integer>> keyBundle = - timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), - TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), - TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); - manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), + CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed, + TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), + TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), + TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(keyBundle)), new Instant(500L)); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -358,40 +386,54 @@ public class InMemoryWatermarkManagerTest implements Serializable { .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L))) .commit(clock.now()); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - ImmutableList.of(firstKeyBundle, secondKeyBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + ImmutableList.of(firstKeyBundle, secondKeyBundle)), + BoundedWindow.TIMESTAMP_MAX_VALUE); - manager.updateWatermarks(firstKeyBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(-1000L)); - manager.updateWatermarks(secondKeyBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(1234L)); + manager.updateWatermarks(firstKeyBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), + new Instant(-1000L)); + manager.updateWatermarks(secondKeyBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), + new Instant(1234L)); TransformWatermarks filteredWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); - assertThat( - filteredWatermarks.getInputWatermark(), + assertThat(filteredWatermarks.getInputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L)))); CommittedBundle<Integer> fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now()); - manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), + manager.updateWatermarks(fauxFirstKeyTimerBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L))); CommittedBundle<Integer> fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now()); - manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(5678L)); + manager.updateWatermarks(fauxSecondKeyTimerBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), + new Instant(5678L)); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L))); - manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), + manager.updateWatermarks(fauxSecondKeyTimerBundle, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); - assertThat( - filteredWatermarks.getOutputWatermark(), + assertThat(filteredWatermarks.getOutputWatermark(), not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); } @@ -403,16 +445,21 @@ public class InMemoryWatermarkManagerTest implements Serializable { public void updateOutputWatermarkShouldBeMonotonic() { CommittedBundle<?> firstInput = bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(firstInput), new Instant(0L)); + manager.updateWatermarks(null, TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(firstInput)), + new Instant(0L)); TransformWatermarks firstWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L))); CommittedBundle<?> secondInput = bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(secondInput), new Instant(-250L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(secondInput)), + new Instant(-250L)); TransformWatermarks secondWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L)))); @@ -425,17 +472,22 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void updateWatermarkWithHoldsShouldBeMonotonic() { CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts, - TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), - TimestampedValue.of(3, new Instant(-1000L))); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + TimestampedValue.of(1, new Instant(1_000_000L)), + TimestampedValue.of(2, new Instant(1234L)), + TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(createdBundle)), + new Instant(Long.MAX_VALUE)); CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); - manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(keyBundle)), new Instant(500L)); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -462,16 +514,22 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts, TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new Instant(1234L))); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(createdBundle), sourceWatermark); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(createdBundle)), + sourceWatermark); CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L))); // Finish processing the on-time data. The watermarks should progress to be equal to the source - manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(keyBundle), null); + manager.updateWatermarks(createdBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(keyBundle)), + null); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark)); @@ -481,8 +539,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { timestampedBundle(createdInts, TimestampedValue.of(3, new Instant(-1000L))); // the late data arrives in a downstream PCollection after its watermark has advanced past it; // we don't advance the watermark past the current watermark until we've consumed the late data - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(lateDataBundle), new Instant(2_000_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(lateDataBundle)), + new Instant(2_000_000L)); TransformWatermarks bufferedLateWm = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L))); @@ -496,30 +557,31 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<KV<String, Integer>> lateKeyedBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); - manager.updateWatermarks(lateDataBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(lateKeyedBundle), null); + manager.updateWatermarks(lateDataBundle, + TimerUpdate.empty(), + result(keyed.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)), + null); } public void updateWatermarkWithDifferentWindowedValueInstances() { manager.updateWatermarks( null, - createdInts.getProducingTransformInternal(), TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), Collections.<CommittedBundle<?>>singleton( bundleFactory .createRootBundle(createdInts) .add(WindowedValue.valueInGlobalWindow(1)) - .commit(Instant.now())), + .commit(Instant.now()))), BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks( - bundleFactory - .createRootBundle(createdInts) + bundleFactory.createRootBundle(createdInts) .add(WindowedValue.valueInGlobalWindow(1)) .commit(Instant.now()), - keyed.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>emptyList(), + result(keyed.getProducingTransformInternal(), Collections.<CommittedBundle<?>>emptyList()), null); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -533,8 +595,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void getWatermarksAfterOnlyEmptyOutput() { CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(emptyCreateOutput), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -560,12 +624,17 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void getWatermarksAfterHoldAndEmptyOutput() { CommittedBundle<Integer> firstCreateOutput = multiWindowedBundle(createdInts, 1, 2); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(firstCreateOutput), new Instant(12_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(firstCreateOutput)), + new Instant(12_000L)); CommittedBundle<Integer> firstFilterOutput = multiWindowedBundle(filtered); - manager.updateWatermarks(firstCreateOutput, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(firstFilterOutput), + manager.updateWatermarks(firstCreateOutput, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(firstFilterOutput)), new Instant(10_000L)); TransformWatermarks firstFilterWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -573,8 +642,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new Instant(10_000L)))); CommittedBundle<Integer> emptyCreateOutput = multiWindowedBundle(createdInts); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(emptyCreateOutput), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -613,8 +684,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> createOutput = bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(createOutput)), + BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now())); @@ -639,8 +713,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<?> filterOutputBundle = bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L)); - manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filterOutputBundle), + manager.updateWatermarks(createOutput, + TimerUpdate.empty(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(filterOutputBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks filterAfterConsumed = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -661,8 +737,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { // @Test public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(createdBundle), new Instant(1248L)); + manager.updateWatermarks(null, TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(createdBundle)), + new Instant(1248L)); TransformWatermarks filteredWms = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -678,8 +756,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME); TimerUpdate timers = TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build(); - manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), timers, - Collections.<CommittedBundle<?>>singleton(filteredBundle), + manager.updateWatermarks(createdBundle, + timers, + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); Instant startTime = clock.now(); clock.set(startTime.plus(250L)); @@ -712,11 +792,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo) .commit(filteredWms.getSynchronizedProcessingOutputTime()); // Complete the processing time timer - manager.updateWatermarks(filteredTimerBundle, filtered.getProducingTransformInternal(), + manager.updateWatermarks(filteredTimerBundle, TimerUpdate.builder("key") - .withCompletedTimers(Collections.<TimerData>singleton(pastTimer)) - .build(), - Collections.<CommittedBundle<?>>singleton(filteredTimerResult), + .withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(filteredTimerResult)), BoundedWindow.TIMESTAMP_MAX_VALUE); clock.set(startTime.plus(500L)); @@ -726,8 +806,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(earlierThan(filteredTimerResult.getSynchronizedProcessingOutputWatermark()))); - manager.updateWatermarks(filteredTimerResult, filteredTimesTwo.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), + manager.updateWatermarks(filteredTimerResult, + TimerUpdate.empty(), + result(filteredTimesTwo.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); @@ -761,18 +843,23 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> createOutput = bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(createOutput)), + BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now()))); - assertThat( - createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now()))); + assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), + not(laterThan(clock.now()))); CommittedBundle<Integer> createSecondOutput = bundleFactory.createRootBundle(createdInts).commit(new Instant(750L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(createSecondOutput), + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(createSecondOutput)), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); @@ -781,16 +868,20 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() { CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(created), new Instant(40_900L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(created)), + new Instant(40_900L)); CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4); Instant upstreamHold = new Instant(2048L); TimerData upstreamProcessingTimer = TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME); - manager.updateWatermarks(created, filtered.getProducingTransformInternal(), + manager.updateWatermarks(created, TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(), - Collections.<CommittedBundle<?>>singleton(filteredBundle), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks downstreamWms = @@ -806,11 +897,12 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(downstreamWms.getSynchronizedProcessingInputTime(), equalTo(upstreamHold)); CommittedBundle<Integer> otherCreated = multiWindowedBundle(createdInts, 4, 8, 12); - manager.updateWatermarks(otherCreated, filtered.getProducingTransformInternal(), + manager.updateWatermarks(otherCreated, TimerUpdate.builder("key") - .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)) - .build(), - Collections.<CommittedBundle<?>>emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE); + .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), + BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(earlierThan(clock.now()))); } @@ -820,9 +912,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { CommittedBundle<Integer> created = multiWindowedBundle(createdInts, 1, 2, 3); manager.updateWatermarks( null, - createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(created), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(created)), new Instant(29_919_235L)); Instant upstreamHold = new Instant(2048L); @@ -830,9 +922,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { bundleFactory.createKeyedBundle(created, "key", filtered).commit(upstreamHold); manager.updateWatermarks( created, - filtered.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(filteredBundle), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(filteredBundle)), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks downstreamWms = @@ -852,8 +944,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)), + new Instant(1500L)); TimerData earliestTimer = TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME); @@ -869,11 +963,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { .setTimer(lastTimer) .build(); - manager.updateWatermarks( - createdBundle, - filtered.getProducingTransformInternal(), + manager.updateWatermarks(createdBundle, update, - Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = @@ -886,8 +979,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { FiredTimers firstFired = firstFilteredTimers.get(key); assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), + new Instant(50_000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -909,8 +1005,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)), + new Instant(1500L)); TimerData earliestTimer = TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME); @@ -928,9 +1026,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks( createdBundle, - filtered.getProducingTransformInternal(), update, - Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = @@ -944,8 +1042,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), contains(earliestTimer)); clock.set(new Instant(50_000L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), + new Instant(50_000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -967,8 +1068,10 @@ public class InMemoryWatermarkManagerTest implements Serializable { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)), + new Instant(1500L)); TimerData earliestTimer = TimerData.of( StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); @@ -986,9 +1089,9 @@ public class InMemoryWatermarkManagerTest implements Serializable { manager.updateWatermarks( createdBundle, - filtered.getProducingTransformInternal(), update, - Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten)), + result(filtered.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))), new Instant(1000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers = @@ -1003,8 +1106,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), contains(earliestTimer)); clock.set(new Instant(50_000L)); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L)); + manager.updateWatermarks(null, + TimerUpdate.empty(), + result(createdInts.getProducingTransformInternal(), + Collections.<CommittedBundle<?>>emptyList()), + new Instant(50_000L)); Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -1133,7 +1239,6 @@ public class InMemoryWatermarkManagerTest implements Serializable { ReadableInstant instant = (ReadableInstant) item; return instant.isAfter(shouldBeEarlier); } - @Override public void describeTo(Description description) { description.appendText("later than ").appendValue(shouldBeEarlier); @@ -1165,4 +1270,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { } return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE); } + + private final CommittedResult result( + AppliedPTransform<?, ?, ?> transform, + Iterable<? extends CommittedBundle<?>> bundles) { + return CommittedResult.create(StepTransformResult.withoutHold(transform) + .build(), bundles); + } }
