pnowojski commented on a change in pull request #12472:
URL: https://github.com/apache/flink/pull/12472#discussion_r512001282



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
##########
@@ -106,6 +109,8 @@ public ExecutorThreadFactory(String poolName, 
UncaughtExceptionHandler exception
 
        @Override
        public Thread newThread(Runnable runnable) {
+               runnable = new RunnableMDCWrapper(runnable);
+

Review comment:
       Will this work correctly if the `ExecutorThreadFactory` is used in some 
thread pool that is shared between different jobs? 
   
   I think at best, we would loose the `MDC`. At worst, if new pooled worker is 
spawned when in the scope of Job1's MDC, this MDC would stick with this worker 
forever, regardless if it's executing code belonging to some another job?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -530,8 +539,10 @@ public void startTaskThread() {
        @Override
        public void run() {
                try {
+                       MDC.put("jobName", this.jobName);
                        doRun();
                } finally {
+                       MDC.remove("jobName");
                        terminationFuture.complete(executionState);

Review comment:
       Don't we want to have the same changes in `LegacySourceFunctionThread` 
as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to