pnowojski commented on code in PR #22806:
URL: https://github.com/apache/flink/pull/22806#discussion_r1245507575
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -195,9 +195,19 @@ void announceCombinedWatermark() {
"Distributing maxAllowedWatermark={} to subTaskIds={}",
maxAllowedWatermark,
subTaskIds);
- for (Integer subtaskId : subTaskIds) {
- context.sendEventToSourceOperator(
- subtaskId, new
WatermarkAlignmentEvent(maxAllowedWatermark));
+ // Because of Java-ThreadPoolExecutor will not schedule the period task
+ // if it throws an exception, so we should handle the potential
exception like
+ // "subtask xx is not ready yet to receive events" to increase
robustness.
+ try {
+ for (Integer subtaskId : subTaskIds) {
+ context.sendEventToSourceOperator(
+ subtaskId, new
WatermarkAlignmentEvent(maxAllowedWatermark));
+ }
Review Comment:
Huh. I have a couple of remarks here:
- logging warn here, for an expected case that some subtask might not be yet
running, is not ideal. But I'm not sure how to best handle that. Maybe only
start printing it after a 5th consecutive failure? Can we somehow determine if
an exception from sending an event is intermittent or not?
- should we actually fail the job at some point due to this kind of errors?
- I think we need a safety net when scheduling something in the
`coordinatorExecutor`. This independent of properly handling the exception in
this ticket, just to make sure in the future we won't have a similar bug due to
unnoticed and unhandled exceptions. I would either:
- remove `context.getCoordinatorExecutor()` getter, and allow to schedule
runnables only through the context, like
`SourceCoordinatorContext#runInCoordinatorThread`
- or replace the returned executor with some wrapper that would actually
fail the job on all unexpected failures
##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -131,6 +133,34 @@ void testWatermarkAlignmentWithTwoGroups() throws
Exception {
}
}
+ @Test
+ @Timeout(value = 10)
+ void testSendWatermarkAlignmentEventFailed() throws Exception {
+ long maxDrift = 1000L;
+
+ WatermarkAlignmentParams params =
+ new WatermarkAlignmentParams(maxDrift, "group1", maxDrift);
+ sourceCoordinator = getNewSourceCoordinator(params);
+ sourceCoordinator.start();
+
+ final int subtask = 0;
+ int attemptNumber = 0;
+ sourceCoordinator.handleEventFromOperator(
+ subtask,
+ attemptNumber,
+ new ReaderRegistrationEvent(subtask,
createLocationFor(subtask, attemptNumber)));
+ // SubTask ReportedWatermarkEvent before setReaderTaskReady to
simulate task failover
+ sourceCoordinator.handleEventFromOperator(
+ subtask, attemptNumber, new ReportedWatermarkEvent(1000));
+ Thread.sleep(3000);
Review Comment:
This should be converted to a test without busy waiting and a timeout. I'm
not sure what are you waiting for, but this should be changed to some latch, or
other waiting condition.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -131,6 +133,34 @@ void testWatermarkAlignmentWithTwoGroups() throws
Exception {
}
}
+ @Test
+ @Timeout(value = 10)
Review Comment:
Also as a general rule, we are never adding timeouts on per test basis:
- we have some watchdog implemented in the CI, that after some inactivity OR
global ~4h timeout, timeouts all running tests with thread dump and some other
extra things
- timeouts such as this, tend to fail in the CI infrastructure, as sooner or
later some crappy machine running this particular test will freeze for whatever
reason , even for minutes.
- it is is annoying when such test fails during debugging from IDE
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]