This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dbc88d8c46c [SPARK-40234][CORE] Clean only MDC items set by Spark
dbc88d8c46c is described below
commit dbc88d8c46c52efea221a5e51eb410035896f8ee
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Aug 27 12:54:10 2022 -0700
[SPARK-40234][CORE] Clean only MDC items set by Spark
### What changes were proposed in this pull request?
This patch proposes to reduce the range of MDC cleanup from all MDC items
to only MDC items set by Spark `setLocalProperty`.
### Why are the changes needed?
Since [SPARK-8981](https://issues.apache.org/jira/browse/SPARK-8981), Spark
executor adds MDC support. Before setting MDC items, the executor cleans up all
MDC items. But it causes an issue for other MDC items not set by Spark but from
users at other places. It causes these custom MDC items not shown in executor
log.
### Does this PR introduce _any_ user-facing change?
Yes, previously cleaned MDC items will be shown.
### How was this patch tested?
Test manually.
Closes #37680 from viirya/remove_mdc_clean.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../src/main/scala/org/apache/spark/executor/Executor.scala | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d01de3b9ed0..ab2bd1b7801 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -772,6 +772,7 @@ private[spark] class Executor(
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),
t)
}
} finally {
+ cleanMDCForTask(taskName, mdcProperties)
runningTasks.remove(taskId)
if (taskStarted) {
// This means the task was successfully deserialized, its stageId
and stageAttemptId
@@ -788,8 +789,6 @@ private[spark] class Executor(
private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]):
Unit = {
try {
- // make sure we run the task with the user-specified mdc properties only
- MDC.clear()
mdc.foreach { case (key, value) => MDC.put(key, value) }
// avoid overriding the takName by the user
MDC.put("mdc.taskName", taskName)
@@ -798,6 +797,15 @@ private[spark] class Executor(
}
}
+ private def cleanMDCForTask(taskName: String, mdc: Seq[(String, String)]):
Unit = {
+ try {
+ mdc.foreach { case (key, _) => MDC.remove(key) }
+ MDC.remove("mdc.taskName")
+ } catch {
+ case _: NoSuchFieldError => logInfo("MDC is not supported.")
+ }
+ }
+
/**
* Supervises the killing / cancellation of a task by sending the
interrupted flag, optionally
* sending a Thread.interrupt(), and monitoring the task until it finishes.
@@ -897,6 +905,7 @@ private[spark] class Executor(
}
}
} finally {
+ cleanMDCForTask(taskRunner.taskName, taskRunner.mdcProperties)
// Clean up entries in the taskReaperForTask map.
taskReaperForTask.synchronized {
taskReaperForTask.get(taskId).foreach { taskReaperInMap =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]