tzulitai commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1167082068


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##########
@@ -160,13 +160,18 @@ public void testWatermarkAlignmentWithIdleness() throws 
Exception {
             expectedOutput.add(record1);
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, record1);
+            // mock WatermarkAlignmentEvent from SourceCoordinator
+            operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 
100));
             assertOutput(actualOutput, expectedOutput);
             assertTrue(operator.isAvailable());
 
             // source becomes idle, it should report Long.MAX_VALUE as the 
watermark
             assertThat(operator.emitNext(actualOutput), 
is(DataInputStatus.NOTHING_AVAILABLE));
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+            // receive Long.MAX_VALUE as WatermarkAlignmentEvent
+            // because reported Long.MAX_VALUE watermark + 
maxAllowedWatermarkDrift will overflow

Review Comment:
   Probably don't need this comment; i.e. its too much of an implementation 
detail.
   
   Or maybe we just make note of the general contract between coordinator <--> 
subtasks: If all source subtasks of the watermark group are idle, then the 
coordinator will report `Long.MAX_VALUE`.
   
   Whether or not there was arithmetic overflow isn't really a concern here, so 
I would like to avoid excessive comments.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,18 @@ void announceCombinedWatermark() {
                                     
aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + 
watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =
+                    Math.addExact(
+                            globalCombinedWatermark.getTimestamp(),
+                            
watermarkAlignmentParams.getMaxAllowedWatermarkDrift());
+        } catch (ArithmeticException e) {

Review Comment:
   Could `ArithmenticException` be thrown due to any other reason outside of 
overflowing? If yes, it could be a bit dangerous to treat it like so.
   
   Would it be sufficient to handle it like this?:
   ```
   long maxAllowedWatermark = (globalCombinedWatermark.getTimestamp() != 
Watermark.MAX_WATERMARK.getTimestamp())
       ? globalCombinedWatermark.getTimestamp() + 
watermarkAlignmentParams.getMaxAllowedWatermarkDrift()
       : Watermark.MAX_WATERMARK.getTimestamp()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to