This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 298fd32b128 [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow
when the source is idle
298fd32b128 is described below
commit 298fd32b1286e1e5f47fcedbb56d4fee7cfc4eb1
Author: haishui <[email protected]>
AuthorDate: Wed Mar 29 11:06:40 2023 +0800
[FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source
is idle
This closes #22291.
---
.../runtime/source/coordinator/SourceCoordinator.java | 15 ++++++++++++---
.../coordinator/SourceCoordinatorAlignmentTest.java | 16 ++++++++++++++++
.../api/operators/SourceOperatorAlignmentTest.java | 6 ++++++
3 files changed, 34 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 842b768050d..d1333bb4499 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -178,9 +178,18 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
aggregator.getAggregatedWatermark().getTimestamp());
});
- long maxAllowedWatermark =
- globalCombinedWatermark.getTimestamp()
- +
watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+ long maxAllowedWatermark;
+ try {
+ maxAllowedWatermark =
+ Math.addExact(
+ globalCombinedWatermark.getTimestamp(),
+
watermarkAlignmentParams.getMaxAllowedWatermarkDrift());
+ } catch (ArithmeticException e) {
+ // when the source is idle, globalCombinedWatermark.getTimestamp()
is Long.MAX_VALUE,
+ // and maxAllowedWatermark arithmetic overflow
+ maxAllowedWatermark = Watermark.MAX_WATERMARK.getTimestamp();
+ }
+
Set<Integer> subTaskIds = combinedWatermark.keySet();
LOG.info(
"Distributing maxAllowedWatermark={} to subTaskIds={}",
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
index 66c34f09973..3453e0ad53b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
@@ -83,6 +83,22 @@ class SourceCoordinatorAlignmentTest extends
SourceCoordinatorTestBase {
reportWatermarkEvent(sourceCoordinator1, subtask0, 42);
assertLatestWatermarkAlignmentEvent(subtask0, 1042);
assertLatestWatermarkAlignmentEvent(subtask1, 1042);
+
+ // all subtask becomes idle
+ reportWatermarkEvent(sourceCoordinator1, subtask0, Long.MAX_VALUE);
+ reportWatermarkEvent(sourceCoordinator1, subtask1, Long.MAX_VALUE);
+ assertLatestWatermarkAlignmentEvent(subtask0, Long.MAX_VALUE);
+ assertLatestWatermarkAlignmentEvent(subtask1, Long.MAX_VALUE);
+
+ // subtask0 becomes active again
+ reportWatermarkEvent(sourceCoordinator1, subtask0, 42);
+ assertLatestWatermarkAlignmentEvent(subtask0, 1042);
+ assertLatestWatermarkAlignmentEvent(subtask1, 1042);
+
+ // subtask1 becomes active again
+ reportWatermarkEvent(sourceCoordinator1, subtask1, 46);
+ assertLatestWatermarkAlignmentEvent(subtask0, 1042);
+ assertLatestWatermarkAlignmentEvent(subtask1, 1042);
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index a0d0c00d579..f8da5632859 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -160,6 +160,8 @@ public class SourceOperatorAlignmentTest {
expectedOutput.add(record1);
context.getTimeService().advance(1);
assertLatestReportedWatermarkEvent(context, record1);
+ // mock WatermarkAlignmentEvent from SourceCoordinator
+ operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 +
100));
assertOutput(actualOutput, expectedOutput);
assertTrue(operator.isAvailable());
@@ -167,6 +169,9 @@ public class SourceOperatorAlignmentTest {
assertThat(operator.emitNext(actualOutput),
is(DataInputStatus.NOTHING_AVAILABLE));
context.getTimeService().advance(1);
assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+ // If all source subtasks of the watermark group are idle,
+ // then the coordinator will report Long.MAX_VALUE
+ operator.handleOperatorEvent(new
WatermarkAlignmentEvent(Long.MAX_VALUE));
// it is easier to create a new split than add records the old
one. The old one is
// serialized, when sending the AddSplitEvent, so it is not as
easy as
@@ -185,6 +190,7 @@ public class SourceOperatorAlignmentTest {
// becomes active again, should go back to the previously emitted
// watermark, as the record2 does not emit watermarks
assertLatestReportedWatermarkEvent(context, record1);
+ operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 +
100));
assertOutput(actualOutput, expectedOutput);
assertTrue(operator.isAvailable());
}