pnowojski commented on code in PR #22806:
URL: https://github.com/apache/flink/pull/22806#discussion_r1245507575


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -195,9 +195,19 @@ void announceCombinedWatermark() {
                 "Distributing maxAllowedWatermark={} to subTaskIds={}",
                 maxAllowedWatermark,
                 subTaskIds);
-        for (Integer subtaskId : subTaskIds) {
-            context.sendEventToSourceOperator(
-                    subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+        // Because of Java-ThreadPoolExecutor will not schedule the period task
+        // if it throws an exception, so we should handle the potential 
exception like
+        // "subtask xx is not ready yet to receive events" to increase 
robustness.
+        try {
+            for (Integer subtaskId : subTaskIds) {
+                context.sendEventToSourceOperator(
+                        subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+            }

Review Comment:
   Huh. I have a couple of remarks here:
   - logging warn here, for an expected case that some subtask might not be yet 
running, is not ideal. But I'm not sure how to best handle that. Maybe only 
start printing it after a 5th consecutive failure? Can we somehow determine if 
an exception from sending an event is intermittent or not?
   - should we actually fail the job at some point due to this kind of errors?
   - I think we need a safety net when scheduling something in the 
`coordinatorExecutor`. This independent of properly handling the exception in 
this ticket, just to make sure in the future we won't have a similar bug due to 
unnoticed and unhandled exceptions. I would either:
     - remove `context.getCoordinatorExecutor()` getter, and allow to schedule 
runnables only through the context, like 
`SourceCoordinatorContext#runInCoordinatorThread`
     - or replace the returned executor with some wrapper that would actually 
fail the job on all unexpected failures



##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -131,6 +133,34 @@ void testWatermarkAlignmentWithTwoGroups() throws 
Exception {
         }
     }
 
+    @Test
+    @Timeout(value = 10)
+    void testSendWatermarkAlignmentEventFailed() throws Exception {
+        long maxDrift = 1000L;
+
+        WatermarkAlignmentParams params =
+                new WatermarkAlignmentParams(maxDrift, "group1", maxDrift);
+        sourceCoordinator = getNewSourceCoordinator(params);
+        sourceCoordinator.start();
+
+        final int subtask = 0;
+        int attemptNumber = 0;
+        sourceCoordinator.handleEventFromOperator(
+                subtask,
+                attemptNumber,
+                new ReaderRegistrationEvent(subtask, 
createLocationFor(subtask, attemptNumber)));
+        // SubTask ReportedWatermarkEvent before setReaderTaskReady to 
simulate task failover
+        sourceCoordinator.handleEventFromOperator(
+                subtask, attemptNumber, new ReportedWatermarkEvent(1000));
+        Thread.sleep(3000);

Review Comment:
   This should be converted to a test without busy waiting and a timeout. I'm 
not sure what are you waiting for, but this should be changed to some latch, or 
other waiting condition. 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -131,6 +133,34 @@ void testWatermarkAlignmentWithTwoGroups() throws 
Exception {
         }
     }
 
+    @Test
+    @Timeout(value = 10)

Review Comment:
   Also as a general rule, we are never adding timeouts on per test basis:
   
   - we have some watchdog implemented in the CI, that after some inactivity OR 
global ~4h timeout, timeouts all running tests with thread dump and some other 
extra things
   - timeouts such as this, tend to fail in the CI infrastructure, as sooner or 
later some crappy machine running this particular test will freeze for whatever 
reason , even for minutes.
   - it is is annoying when such test fails during debugging from IDE 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to