This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2eb158d [BEAM-12459] Ensure that we use the min element timestamp for
Watch watermark if no explicit watermark is provided.
new f4430e6 Merge pull request #14968 from lukecwik/beam12459
2eb158d is described below
commit 2eb158d55291372a82a9d4960a8046b1ab4f0a7b
Author: Luke Cwik <[email protected]>
AuthorDate: Tue Jun 8 11:20:36 2021 -0700
[BEAM-12459] Ensure that we use the min element timestamp for Watch
watermark if no explicit watermark is provided.
---
.../java/org/apache/beam/sdk/transforms/Watch.java | 22 +++++--
.../org/apache/beam/sdk/transforms/WatchTest.java | 74 ++++++++++++++++++++++
2 files changed, 90 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 0729fff..0f96986 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -840,7 +840,8 @@ public class Watch {
}
@UnboundedPerElement
- private static class WatchGrowthFn<InputT, OutputT, KeyT, TerminationStateT>
+ @VisibleForTesting
+ protected static class WatchGrowthFn<InputT, OutputT, KeyT,
TerminationStateT>
extends DoFn<InputT, KV<InputT, List<TimestampedValue<OutputT>>>> {
private final Watch.Growth<InputT, OutputT, KeyT> spec;
private final Coder<OutputT> outputCoder;
@@ -848,7 +849,7 @@ public class Watch {
private final Coder<KeyT> outputKeyCoder;
private final Funnel<OutputT> coderFunnel;
- private WatchGrowthFn(
+ WatchGrowthFn(
Growth<InputT, OutputT, KeyT> spec,
Coder<OutputT> outputCoder,
SerializableFunction<OutputT, KeyT> outputKeyFn,
@@ -899,7 +900,6 @@ public class Watch {
priorPoll.getOutputs().size());
c.output(KV.of(c.element(), priorPoll.getOutputs()));
}
- watermarkEstimator.setWatermark(priorPoll.getWatermark());
}
return stop();
}
@@ -911,7 +911,8 @@ public class Watch {
PollingGrowthState<TerminationStateT> pollingRestriction =
(PollingGrowthState<TerminationStateT>) currentRestriction;
- // Produce a poll result that only contains never seen before results.
+ // Produce a poll result that only contains never seen before results in
timestamp
+ // sorted order.
Growth.PollResult<OutputT> newResults =
computeNeverSeenBeforeResults(pollingRestriction, res);
@@ -941,8 +942,13 @@ public class Watch {
c.output(KV.of(c.element(), newResults.getOutputs()));
}
+ Instant computedWatermark = null;
if (newResults.getWatermark() != null) {
- watermarkEstimator.setWatermark(newResults.getWatermark());
+ computedWatermark = newResults.getWatermark();
+ } else if (!newResults.getOutputs().isEmpty()) {
+ // computeNeverSeenBeforeResults returns the elements in timestamp
sorted order so
+ // we can get the timestamp from the first element.
+ computedWatermark = newResults.getOutputs().get(0).getTimestamp();
}
Instant currentTime = Instant.now();
@@ -955,11 +961,15 @@ public class Watch {
return stop();
}
- if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(newResults.getWatermark()))
{
+ if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(computedWatermark)) {
LOG.info("{} - will stop polling, reached max timestamp.",
c.element());
return stop();
}
+ if (computedWatermark != null) {
+ watermarkEstimator.setWatermark(computedWatermark);
+ }
+
LOG.info(
"{} - will resume polling in {} ms.", c.element(),
spec.getPollInterval().getMillis());
return resume().withResumeDelay(spec.getPollInterval());
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
index fbc0151..81873be 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
@@ -31,6 +31,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.io.Serializable;
@@ -48,6 +49,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.Watch.Growth;
import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
@@ -55,6 +57,10 @@ import org.apache.beam.sdk.transforms.Watch.GrowthState;
import org.apache.beam.sdk.transforms.Watch.GrowthTracker;
import org.apache.beam.sdk.transforms.Watch.NonPollingGrowthState;
import org.apache.beam.sdk.transforms.Watch.PollingGrowthState;
+import org.apache.beam.sdk.transforms.Watch.WatchGrowthFn;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -539,6 +545,38 @@ public class WatchTest implements Serializable {
}
@Test
+ public void
testPollingGrowthTrackerUsesElementTimestampIfNoWatermarkProvided() throws
Exception {
+ Instant now = Instant.now();
+ Watch.Growth<String, String, String> growth =
+ Watch.growthOf(
+ new Watch.Growth.PollFn<String, String>() {
+
+ @Override
+ public PollResult<String> apply(String element, Context c)
throws Exception {
+ // We specifically test an unsorted list.
+ return PollResult.incomplete(
+ Arrays.asList(
+ TimestampedValue.of("d",
now.plus(standardSeconds(4))),
+ TimestampedValue.of("c",
now.plus(standardSeconds(3))),
+ TimestampedValue.of("a",
now.plus(standardSeconds(1))),
+ TimestampedValue.of("b",
now.plus(standardSeconds(2)))));
+ }
+ })
+ .withPollInterval(standardSeconds(10));
+ WatchGrowthFn<String, String, String, Integer> growthFn =
+ new WatchGrowthFn(
+ growth, StringUtf8Coder.of(), SerializableFunctions.identity(),
StringUtf8Coder.of());
+ GrowthTracker<String, Integer> tracker = newPollingGrowthTracker();
+ DoFn.ProcessContext context = mock(DoFn.ProcessContext.class);
+ ManualWatermarkEstimator<Instant> watermarkEstimator =
+ new WatermarkEstimators.Manual(BoundedWindow.TIMESTAMP_MIN_VALUE);
+ ProcessContinuation processContinuation =
+ growthFn.process(context, tracker, watermarkEstimator);
+ assertEquals(now.plus(standardSeconds(1)),
watermarkEstimator.currentWatermark());
+ assertTrue(processContinuation.shouldResume());
+ }
+
+ @Test
public void testPollingGrowthTrackerCheckpointNonEmpty() {
Instant now = Instant.now();
GrowthTracker<String, Integer> tracker = newPollingGrowthTracker();
@@ -612,6 +650,42 @@ public class WatchTest implements Serializable {
}
@Test
+ public void testNonPollingGrowthTrackerIgnoresWatermark() throws Exception {
+ Instant now = Instant.now();
+ PollResult<String> claim =
+ PollResult.incomplete(
+ Arrays.asList(
+ TimestampedValue.of("d", now.plus(standardSeconds(4))),
+ TimestampedValue.of("c", now.plus(standardSeconds(3))),
+ TimestampedValue.of("a", now.plus(standardSeconds(1))),
+ TimestampedValue.of("b", now.plus(standardSeconds(2)))))
+ .withWatermark(now.plus(standardSeconds(7)));
+
+ Watch.Growth<String, String, String> growth =
+ Watch.growthOf(
+ new Watch.Growth.PollFn<String, String>() {
+
+ @Override
+ public PollResult<String> apply(String element, Context c)
throws Exception {
+ fail("Never expected to be invoked for
NonPollingGrowthState.");
+ return null;
+ }
+ })
+ .withPollInterval(standardSeconds(10));
+ GrowthTracker<String, Integer> tracker =
newTracker(NonPollingGrowthState.of(claim));
+ WatchGrowthFn<String, String, String, Integer> growthFn =
+ new WatchGrowthFn(
+ growth, StringUtf8Coder.of(), SerializableFunctions.identity(),
StringUtf8Coder.of());
+ DoFn.ProcessContext context = mock(DoFn.ProcessContext.class);
+ ManualWatermarkEstimator<Instant> watermarkEstimator =
+ new WatermarkEstimators.Manual(BoundedWindow.TIMESTAMP_MIN_VALUE);
+ ProcessContinuation processContinuation =
+ growthFn.process(context, tracker, watermarkEstimator);
+ assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE,
watermarkEstimator.currentWatermark());
+ assertFalse(processContinuation.shouldResume());
+ }
+
+ @Test
public void testNonPollingGrowthTrackerCheckpointNonEmpty() {
Instant now = Instant.now();
PollResult<String> claim =