This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.39.0 by this push:
new 276ac1d4569 Merge pull request #17504: [BEAM-14196] add test verifying
output watermark propagation in bundle (#17561)
276ac1d4569 is described below
commit 276ac1d45690c7d3f6bb5175b5b483ef6f05ce1b
Author: Yichi Zhang <[email protected]>
AuthorDate: Thu May 5 11:19:33 2022 -0700
Merge pull request #17504: [BEAM-14196] add test verifying output watermark
propagation in bundle (#17561)
Co-authored-by: Jan Lukavský <[email protected]>
---
.../wrappers/streaming/DoFnOperator.java | 46 ++++++++++++++---
.../streaming/ExecutableStageDoFnOperator.java | 12 ++---
.../beam/runners/flink/FlinkSavepointTest.java | 2 +-
.../wrappers/streaming/DoFnOperatorTest.java | 35 ++++++-------
.../streaming/ExecutableStageDoFnOperatorTest.java | 3 +-
.../wrappers/streaming/WindowDoFnOperatorTest.java | 9 ++++
.../org/apache/beam/sdk/transforms/ParDoTest.java | 58 +++++++++++++++++++++-
7 files changed, 129 insertions(+), 36 deletions(-)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index bc82b7be42b..b1016f073d6 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -118,6 +118,8 @@ import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -678,7 +680,7 @@ public class DoFnOperator<InputT, OutputT>
public final void processElement2(StreamRecord<RawUnionValue> streamRecord)
throws Exception {
// we finish the bundle because the newly arrived side-input might
// make a view available that was previously not ready.
- // The PushbackSideInputRunner will only reset it's cache of non-ready
windows when
+ // The PushbackSideInputRunner will only reset its cache of non-ready
windows when
// finishing a bundle.
invokeFinishBundle();
checkInvokeStartBundle();
@@ -791,6 +793,12 @@ public class DoFnOperator<InputT, OutputT>
invokeFinishBundle();
}
+ if (bundleStarted) {
+ // do not update watermark in the middle of bundle, because it might
cause
+ // user-buffered data to be emitted past watermark
+ return;
+ }
+
LOG.debug("Emitting watermark {}", watermark);
currentOutputWatermark = watermark;
output.emitWatermark(new Watermark(watermark));
@@ -867,13 +875,14 @@ public class DoFnOperator<InputT, OutputT>
@SuppressWarnings("NonAtomicVolatileUpdate")
@SuppressFBWarnings("VO_VOLATILE_INCREMENT")
private void checkInvokeFinishBundleByCount() {
- // We do not access this statement concurrently but we want to make sure
that each thread
+ // We do not access this statement concurrently, but we want to make sure
that each thread
// sees the latest value, which is why we use volatile. See the class
field section above
// for more information.
//noinspection NonAtomicOperationOnVolatileField
elementCount++;
if (elementCount >= maxBundleSize) {
invokeFinishBundle();
+ updateOutputWatermark();
}
}
@@ -882,6 +891,24 @@ public class DoFnOperator<InputT, OutputT>
long now = getProcessingTimeService().getCurrentProcessingTime();
if (now - lastFinishBundleTime >= maxBundleTimeMills) {
invokeFinishBundle();
+ scheduleForCurrentProcessingTime(ts -> updateOutputWatermark());
+ }
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ protected void scheduleForCurrentProcessingTime(ProcessingTimeCallback
callback) {
+ // We are scheduling a timer for advancing the watermark, to not delay
finishing the bundle
+ // and temporarily release the checkpoint lock. Otherwise, we could
potentially loop when a
+ // timer keeps scheduling a timer for the same timestamp.
+ ProcessingTimeService timeService = getProcessingTimeService();
+ timeService.registerTimer(timeService.getCurrentProcessingTime(),
callback);
+ }
+
+ private void updateOutputWatermark() {
+ try {
+ processInputWatermark(false);
+ } catch (Exception ex) {
+ failBundleFinalization(ex);
}
}
@@ -910,6 +937,7 @@ public class DoFnOperator<InputT, OutputT>
while (bundleStarted) {
invokeFinishBundle();
}
+ updateOutputWatermark();
}
}
@@ -942,16 +970,20 @@ public class DoFnOperator<InputT, OutputT>
}
outputManager.closeBuffer();
} catch (Exception e) {
- // https://jira.apache.org/jira/browse/FLINK-14653
- // Any regular exception during checkpointing will be tolerated by Flink
because those
- // typically do not affect the execution flow. We need to fail hard here
because errors
- // in bundle execution are application errors which are not related to
checkpointing.
- throw new Error("Checkpointing failed because bundle failed to
finalize.", e);
+ failBundleFinalization(e);
}
super.snapshotState(context);
}
+ private void failBundleFinalization(Exception e) {
+ // https://jira.apache.org/jira/browse/FLINK-14653
+ // Any regular exception during checkpointing will be tolerated by Flink
because those
+ // typically do not affect the execution flow. We need to fail hard here
because errors
+ // in bundle execution are application errors which are not related to
checkpointing.
+ throw new Error("Checkpointing failed because bundle failed to finalize.",
e);
+ }
+
public BundleFinalizer getBundleFinalizer() {
return bundleFinalizer;
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 6b6ebbfc729..6de413aa363 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -115,7 +115,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -700,7 +699,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
@Override
public void flushData() throws Exception {
closed = true;
- // We might still holding back the watermark and Flink does not trigger
the timer
+ // We might still hold back the watermark and Flink does not trigger the
timer
// callback for watermark advancement anymore.
processWatermark1(Watermark.MAX_WATERMARK);
while (getCurrentOutputWatermark() <
Watermark.MAX_WATERMARK.getTimestamp()) {
@@ -835,7 +834,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
inputWatermarkBeforeBundleStart = getEffectiveInputWatermark();
}
- @SuppressWarnings("FutureReturnValueIgnored")
private void finishBundleCallback() {
minEventTimeTimerTimestampInLastBundle =
minEventTimeTimerTimestampInCurrentBundle;
minEventTimeTimerTimestampInCurrentBundle = Long.MAX_VALUE;
@@ -843,12 +841,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
if (!closed
&& minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE
&& minEventTimeTimerTimestampInLastBundle <=
getEffectiveInputWatermark()) {
- ProcessingTimeService processingTimeService =
getProcessingTimeService();
- // We are scheduling a timer for advancing the watermark, to not delay
finishing the bundle
- // and temporarily release the checkpoint lock. Otherwise, we could
potentially loop when a
- // timer keeps scheduling a timer for the same timestamp.
- processingTimeService.registerTimer(
- processingTimeService.getCurrentProcessingTime(),
+
+ scheduleForCurrentProcessingTime(
ts -> processWatermark1(new
Watermark(getEffectiveInputWatermark())));
} else {
processWatermark1(new Watermark(getEffectiveInputWatermark()));
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index d957b4e5c13..a5949a6965f 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -316,7 +316,7 @@ public class FlinkSavepointTest implements Serializable {
MapElements.via(
new InferableFunction<byte[], KV<String, Void>>() {
@Override
- public KV<String, Void> apply(byte[] input) throws
Exception {
+ public KV<String, Void> apply(byte[] input) {
// This only writes data to one of the two initial
partitions.
// We want to test this due to
// https://jira.apache.org/jira/browse/BEAM-7144
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index acfede826e7..3ce0862b38c 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -27,7 +27,9 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.util.LRUMap;
@@ -93,6 +95,7 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIt
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -326,12 +329,10 @@ public class DoFnOperatorTest {
@OnTimer(processingTimerId)
public void onProcessingTime(OnTimerContext context) {
- assertEquals(
- // Timestamps in processing timer context are defined to be
the input watermark
- // See SimpleDoFnRunner#onTimer
- "Timer timestamp must match current input watermark",
- timerTimestamp.plus(Duration.millis(1)),
- context.timestamp());
+ assertTrue(
+ // Timestamps in processing timer context are defined to be
the output watermark
+ "Timer timestamp must be at most current input watermark",
+
!timerTimestamp.plus(Duration.millis(1)).isBefore(context.timestamp()));
context.outputWithTimestamp(processingTimeMessage,
context.timestamp());
}
};
@@ -422,17 +423,12 @@ public class DoFnOperatorTest {
// this must fire the processing timer
testHarness.setProcessingTime(timerTimestamp.getMillis() + 1);
- assertThat(
- stripStreamRecordFromWindowedValue(testHarness.getOutput()),
- contains(
- WindowedValue.of(
- // Timestamps in processing timer context are defined to be
the input watermark
- // See SimpleDoFnRunner#onTimer
- processingTimeMessage,
- timerTimestamp.plus(Duration.millis(1)),
- window1,
- PaneInfo.NO_FIRING)));
-
+ ArrayList<WindowedValue<?>> outputs =
+
Lists.newArrayList(stripStreamRecordFromWindowedValue(testHarness.getOutput()));
+ assertEquals(1, outputs.size());
+ assertEquals(processingTimeMessage, outputs.get(0).getValue());
+
assertFalse(timerTimestamp.plus(Duration.millis(1)).isBefore(outputs.get(0).getTimestamp()));
+ assertEquals(Collections.singletonList(window1),
outputs.get(0).getWindows());
testHarness.close();
}
@@ -794,6 +790,11 @@ public class DoFnOperatorTest {
assertThat(testHarness.numEventTimeTimers(), is(0));
assertThat(testHarness.numKeyedStateEntries(), is(2));
+ // enforce closing of bundle
+ testHarness.setProcessingTime(
+ testHarness.getProcessingTime()
+ + 2 * FlinkPipelineOptions.defaults().getMaxBundleTimeMills());
+
// Should not trigger garbage collection yet
testHarness.processWatermark(
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)).getMillis());
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 78e240931b0..a82b269a525 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -27,6 +27,7 @@ import static
org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -537,7 +538,7 @@ public class ExecutableStageDoFnOperatorTest {
assertThat(testHarness.numEventTimeTimers(), is(3));
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
assertThat(testHarness.numEventTimeTimers(), is(0));
- assertThat(operator.getCurrentOutputWatermark(), is(5L));
+ assertTrue(operator.getCurrentOutputWatermark() <= 5L);
// Output watermark is advanced synchronously when the bundle finishes,
// no more timers are scheduled
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
index ab45072ab58..3e0f028e1f4 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
@@ -155,6 +155,10 @@ public class WindowDoFnOperatorTest {
assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(1));
assertThat(testHarness.numKeyedStateEntries(), is(6));
+ // close bundle
+ testHarness.setProcessingTime(
+ testHarness.getProcessingTime()
+ + 2 * FlinkPipelineOptions.defaults().getMaxBundleTimeMills());
assertThat(windowDoFnOperator.getCurrentOutputWatermark(), is(1L));
// close window
@@ -164,6 +168,11 @@ public class WindowDoFnOperatorTest {
assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0));
assertThat(testHarness.numKeyedStateEntries(), is(3));
+
+ // close bundle
+ testHarness.setProcessingTime(
+ testHarness.getProcessingTime()
+ + 2 * FlinkPipelineOptions.defaults().getMaxBundleTimeMills());
assertThat(windowDoFnOperator.getCurrentOutputWatermark(), is(100L));
testHarness.processWatermark(200L);
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 9846f51343b..3207295a755 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -126,6 +126,7 @@ import org.apache.beam.sdk.transforms.Mean.CountSum;
import org.apache.beam.sdk.transforms.ParDo.SingleOutput;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -6515,7 +6516,6 @@ public class ParDoTest implements Serializable {
Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
// verify state
assertEquals(1, (int) currentValue);
- System.err.println("KEY " + key + " VALUE " + currentValue);
// To check output is received from OnWindowExpiration
r.output(currentValue);
}
@@ -6674,4 +6674,60 @@ public class ParDoTest implements Serializable {
fieldAccessDescriptor.getNestedFieldsAccessed().isEmpty());
}
}
+
+ @RunWith(JUnit4.class)
+ public static class BundleInvariantsTests extends SharedTestBase implements
Serializable {
+
+ @Test
+ @Category({ValidatesRunner.class, UsesUnboundedPCollections.class,
UsesTestStream.class})
+ public void testWatermarkUpdateMidBundle() {
+ DoFn<String, String> bufferDoFn =
+ new DoFn<String, String>() {
+ private final Set<KV<String, Instant>> buffer = new HashSet<>();
+
+ @ProcessElement
+ public void process(@Element String in, @Timestamp Instant ts) {
+ buffer.add(KV.of(in, ts));
+ }
+
+ @FinishBundle
+ public void finish(FinishBundleContext context) {
+ buffer.forEach(k -> context.output(k.getKey(), k.getValue(),
GlobalWindow.INSTANCE));
+ buffer.clear();
+ }
+ };
+ int numBundles = 200;
+ TestStream.Builder<String> builder =
+ TestStream.create(StringUtf8Coder.of()).advanceWatermarkTo(new
Instant(0));
+ List<List<TimestampedValue<String>>> bundles =
+ IntStream.range(0, numBundles)
+ .mapToObj(
+ r ->
+ IntStream.range(0, r + 1)
+ .mapToObj(v ->
TimestampedValue.of(String.valueOf(v), new Instant(r)))
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList());
+ for (List<TimestampedValue<String>> b : bundles) {
+ builder =
+ builder
+ .addElements(b.get(0), b.subList(1, b.size()).toArray(new
TimestampedValue[] {}))
+ .advanceWatermarkTo(new Instant(b.size()));
+ }
+ PCollection<Long> result =
+ pipeline
+ .apply(builder.advanceWatermarkToInfinity())
+ .apply(ParDo.of(bufferDoFn))
+ .apply("milliWindow",
Window.into(FixedWindows.of(Duration.millis(1))))
+ .apply("count",
Combine.globally(Count.<String>combineFn()).withoutDefaults())
+ .apply(
+ "globalWindow",
+ Window.<Long>into(new GlobalWindows())
+ .triggering(AfterWatermark.pastEndOfWindow())
+ .withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+ .apply("sum", Sum.longsGlobally());
+ PAssert.that(result).containsInAnyOrder((numBundles * numBundles +
numBundles) / 2L);
+ pipeline.run();
+ }
+ }
}