rkhachatryan commented on code in PR #25598:
URL: https://github.com/apache/flink/pull/25598#discussion_r1824687101


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -166,53 +171,57 @@ public SourceCoordinator(
 
     @VisibleForTesting
     void announceCombinedWatermark() {
-        Set<Integer> subTaskIds = combinedWatermark.keySet();
-        if (subTaskIds.isEmpty()) {
-            LOG.debug(
-                    "Skip distributing maxAllowedWatermark of group={} for 
source {} - no subtasks.",
-                    watermarkAlignmentParams.getWatermarkGroup(),
-                    operatorName);
-        }
-        checkState(
-                watermarkAlignmentParams != 
WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
-
-        Watermark globalCombinedWatermark =
-                coordinatorStore.apply(
+        try (MdcUtils.MdcCloseable mdcCloseable =
+                MdcUtils.withContext(MdcUtils.asContextData(jobID))) {

Review Comment:
   Have you tried to add MDC to the executor that runs this function instead of 
wrapping here?
   It looks like it would be enough to wrap `coordinatorExecutor` in 
`SourceCoordinatorContext`.
   
   That would cover more cases (i.e. if new methods are added) and less 
invasive and verbose.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -535,22 +547,24 @@ private void runInEventLoop(
 
         context.runInCoordinatorThread(
                 () -> {
-                    try {
-                        action.run();
-                    } catch (Throwable t) {
-                        // if we have a JVM critical error, promote it 
immediately, there is a good
-                        // chance the
-                        // logging or job failing will not succeed any more
-                        ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-
-                        final String actionString =
-                                String.format(actionName, 
actionNameFormatParameters);
-                        LOG.error(
-                                "Uncaught exception in the SplitEnumerator for 
Source {} while {}. Triggering job failover.",
-                                operatorName,
-                                actionString,
-                                t);
-                        context.failJob(t);
+                    try (MdcUtils.MdcCloseable mdcCloseable =
+                            
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
+                        try {
+                            action.run();

Review Comment:
   Same as above, I think it should be possible to wrap the executor inside the 
context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to