zentol commented on a change in pull request #12472:
URL: https://github.com/apache/flink/pull/12472#discussion_r537503947



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -530,8 +539,10 @@ public void startTaskThread() {
        @Override
        public void run() {
                try {
+                       MDC.put("jobName", this.jobName);

Review comment:
       We may want to introduce an opt-in configuration option. I cannot assess 
how many threads are being created in existing jobs, and the copy of the 
context map makes be a bit wary.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
##########
@@ -69,10 +71,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;

Review comment:
       we don't allow `*` imports

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -530,8 +539,10 @@ public void startTaskThread() {
        @Override
        public void run() {
                try {
+                       MDC.put("jobName", this.jobName);
                        doRun();
                } finally {
+                       MDC.remove("jobName");
                        terminationFuture.complete(executionState);

Review comment:
       Depends a bit. From what I saw in the log4j code, by default the thread 
context is not inherited to child threads, so we would actually need to setup 
the MDC whenever any thread is created, which is naturally terrible.
   If the `isThreadContextMapInheritable` property is set to `true`, then all 
threads created within a thread inherit the MDC (as a copy), and then we 
shouldn't need to do anything.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
##########
@@ -106,6 +109,8 @@ public ExecutorThreadFactory(String poolName, 
UncaughtExceptionHandler exception
 
        @Override
        public Thread newThread(Runnable runnable) {
+               runnable = new RunnableMDCWrapper(runnable);
+

Review comment:
       I think so, because the wrapper resets the MDC after it has finished.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
##########
@@ -106,6 +109,8 @@ public ExecutorThreadFactory(String poolName, 
UncaughtExceptionHandler exception
 
        @Override
        public Thread newThread(Runnable runnable) {
+               runnable = new RunnableMDCWrapper(runnable);
+

Review comment:
       That said, we shouldn't just override the MDC of the thread; if the 
owner of the pool (e.g., TM) has something in the MDC then it should be 
preserved. (i.e., merge the MDCs instead)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to