zentol commented on a change in pull request #12472:
URL: https://github.com/apache/flink/pull/12472#discussion_r537503947
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -530,8 +539,10 @@ public void startTaskThread() {
@Override
public void run() {
try {
+ MDC.put("jobName", this.jobName);
Review comment:
We may want to introduce an opt-in configuration option. I cannot assess
how many threads are being created in existing jobs, and the copy of the
context map makes be a bit wary.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
##########
@@ -69,10 +71,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
Review comment:
we don't allow `*` imports
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -530,8 +539,10 @@ public void startTaskThread() {
@Override
public void run() {
try {
+ MDC.put("jobName", this.jobName);
doRun();
} finally {
+ MDC.remove("jobName");
terminationFuture.complete(executionState);
Review comment:
Depends a bit. From what I saw in the log4j code, by default the thread
context is not inherited to child threads, so we would actually need to setup
the MDC whenever any thread is created, which is naturally terrible.
If the `isThreadContextMapInheritable` property is set to `true`, then all
threads created within a thread inherit the MDC (as a copy), and then we
shouldn't need to do anything.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
##########
@@ -106,6 +109,8 @@ public ExecutorThreadFactory(String poolName,
UncaughtExceptionHandler exception
@Override
public Thread newThread(Runnable runnable) {
+ runnable = new RunnableMDCWrapper(runnable);
+
Review comment:
I think so, because the wrapper resets the MDC after it has finished.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
##########
@@ -106,6 +109,8 @@ public ExecutorThreadFactory(String poolName,
UncaughtExceptionHandler exception
@Override
public Thread newThread(Runnable runnable) {
+ runnable = new RunnableMDCWrapper(runnable);
+
Review comment:
That said, we shouldn't just override the MDC of the thread; if the
owner of the pool (e.g., TM) has something in the MDC then it should be
preserved. (i.e., merge the MDCs instead)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]