This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 298fd32b128 [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow 
when the source is idle
298fd32b128 is described below

commit 298fd32b1286e1e5f47fcedbb56d4fee7cfc4eb1
Author: haishui <[email protected]>
AuthorDate: Wed Mar 29 11:06:40 2023 +0800

    [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source 
is idle
    
    This closes #22291.
---
 .../runtime/source/coordinator/SourceCoordinator.java    | 15 ++++++++++++---
 .../coordinator/SourceCoordinatorAlignmentTest.java      | 16 ++++++++++++++++
 .../api/operators/SourceOperatorAlignmentTest.java       |  6 ++++++
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 842b768050d..d1333bb4499 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -178,9 +178,18 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
                                     
aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + 
watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =
+                    Math.addExact(
+                            globalCombinedWatermark.getTimestamp(),
+                            
watermarkAlignmentParams.getMaxAllowedWatermarkDrift());
+        } catch (ArithmeticException e) {
+            // when the source is idle, globalCombinedWatermark.getTimestamp() 
is Long.MAX_VALUE,
+            // and maxAllowedWatermark arithmetic overflow
+            maxAllowedWatermark = Watermark.MAX_WATERMARK.getTimestamp();
+        }
+
         Set<Integer> subTaskIds = combinedWatermark.keySet();
         LOG.info(
                 "Distributing maxAllowedWatermark={} to subTaskIds={}",
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
index 66c34f09973..3453e0ad53b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
@@ -83,6 +83,22 @@ class SourceCoordinatorAlignmentTest extends 
SourceCoordinatorTestBase {
             reportWatermarkEvent(sourceCoordinator1, subtask0, 42);
             assertLatestWatermarkAlignmentEvent(subtask0, 1042);
             assertLatestWatermarkAlignmentEvent(subtask1, 1042);
+
+            // all subtask becomes idle
+            reportWatermarkEvent(sourceCoordinator1, subtask0, Long.MAX_VALUE);
+            reportWatermarkEvent(sourceCoordinator1, subtask1, Long.MAX_VALUE);
+            assertLatestWatermarkAlignmentEvent(subtask0, Long.MAX_VALUE);
+            assertLatestWatermarkAlignmentEvent(subtask1, Long.MAX_VALUE);
+
+            // subtask0 becomes active again
+            reportWatermarkEvent(sourceCoordinator1, subtask0, 42);
+            assertLatestWatermarkAlignmentEvent(subtask0, 1042);
+            assertLatestWatermarkAlignmentEvent(subtask1, 1042);
+
+            // subtask1 becomes active again
+            reportWatermarkEvent(sourceCoordinator1, subtask1, 46);
+            assertLatestWatermarkAlignmentEvent(subtask0, 1042);
+            assertLatestWatermarkAlignmentEvent(subtask1, 1042);
         }
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index a0d0c00d579..f8da5632859 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -160,6 +160,8 @@ public class SourceOperatorAlignmentTest {
             expectedOutput.add(record1);
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, record1);
+            // mock WatermarkAlignmentEvent from SourceCoordinator
+            operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 
100));
             assertOutput(actualOutput, expectedOutput);
             assertTrue(operator.isAvailable());
 
@@ -167,6 +169,9 @@ public class SourceOperatorAlignmentTest {
             assertThat(operator.emitNext(actualOutput), 
is(DataInputStatus.NOTHING_AVAILABLE));
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+            // If all source subtasks of the watermark group are idle,
+            // then the coordinator will report Long.MAX_VALUE
+            operator.handleOperatorEvent(new 
WatermarkAlignmentEvent(Long.MAX_VALUE));
 
             // it is easier to create a new split than add records the old 
one. The old one is
             // serialized, when sending the AddSplitEvent, so it is not as 
easy as
@@ -185,6 +190,7 @@ public class SourceOperatorAlignmentTest {
             // becomes active again, should go back to the previously emitted
             // watermark, as the record2 does not emit watermarks
             assertLatestReportedWatermarkEvent(context, record1);
+            operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 
100));
             assertOutput(actualOutput, expectedOutput);
             assertTrue(operator.isAvailable());
         }

Reply via email to