rkhachatryan commented on code in PR #25598:
URL: https://github.com/apache/flink/pull/25598#discussion_r1824687101
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -166,53 +171,57 @@ public SourceCoordinator(
@VisibleForTesting
void announceCombinedWatermark() {
- Set<Integer> subTaskIds = combinedWatermark.keySet();
- if (subTaskIds.isEmpty()) {
- LOG.debug(
- "Skip distributing maxAllowedWatermark of group={} for
source {} - no subtasks.",
- watermarkAlignmentParams.getWatermarkGroup(),
- operatorName);
- }
- checkState(
- watermarkAlignmentParams !=
WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
-
- Watermark globalCombinedWatermark =
- coordinatorStore.apply(
+ try (MdcUtils.MdcCloseable mdcCloseable =
+ MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
Review Comment:
Have you tried to add MDC to the executor that runs this function instead of
wrapping here?
It looks like it would be enough to wrap `coordinatorExecutor` in
`SourceCoordinatorContext`.
That would cover more cases (i.e. if new methods are added) and less
invasive and verbose.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -535,22 +547,24 @@ private void runInEventLoop(
context.runInCoordinatorThread(
() -> {
- try {
- action.run();
- } catch (Throwable t) {
- // if we have a JVM critical error, promote it
immediately, there is a good
- // chance the
- // logging or job failing will not succeed any more
- ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-
- final String actionString =
- String.format(actionName,
actionNameFormatParameters);
- LOG.error(
- "Uncaught exception in the SplitEnumerator for
Source {} while {}. Triggering job failover.",
- operatorName,
- actionString,
- t);
- context.failJob(t);
+ try (MdcUtils.MdcCloseable mdcCloseable =
+
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
+ try {
+ action.run();
Review Comment:
Same as above, I think it should be possible to wrap the executor inside the
context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]