This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ab5461cf54b0 [SPARK-50076][FOLLOW-UP] Fix multiple log key names
ab5461cf54b0 is described below
commit ab5461cf54b01d091e8e48aef1c08a2b77b05dea
Author: Michael Zhang <[email protected]>
AuthorDate: Tue Jan 7 14:46:38 2025 -0800
[SPARK-50076][FOLLOW-UP] Fix multiple log key names
### What changes were proposed in this pull request?
Adds more logkeys to be more specific about values being logged. Also fixes
incorrect logkey usages.
### Why are the changes needed?
To make structured logging more useful.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests pass. Changes are logging only.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49387 from michaelzhan-db/SPARK-50076-pt2.
Lead-authored-by: Michael Zhang <[email protected]>
Co-authored-by: Sample User <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../scala/org/apache/spark/internal/LogKey.scala | 8 ++++++-
.../spark/shuffle/sort/ShuffleExternalSorter.java | 2 +-
.../org/apache/spark/BarrierTaskContext.scala | 2 +-
.../org/apache/spark/scheduler/DAGScheduler.scala | 26 +++++++++++-----------
.../spark/scheduler/OutputCommitCoordinator.scala | 4 ++--
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
.../scala/org/apache/spark/scheduler/TaskSet.scala | 4 ++--
.../apache/spark/scheduler/TaskSetManager.scala | 2 +-
.../apache/spark/util/collection/Spillable.scala | 2 +-
.../execution/adaptive/ShufflePartitionsUtil.scala | 2 +-
.../datasources/v2/WriteToDataSourceV2Exec.scala | 11 +++++----
.../sql/execution/streaming/state/StateStore.scala | 9 ++++----
.../spark/streaming/receiver/BlockGenerator.scala | 2 +-
13 files changed, 43 insertions(+), 33 deletions(-)
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 9e681448aaaf..c3a1af68d1c8 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -94,6 +94,7 @@ private[spark] object LogKeys {
case object BATCH_TIMESTAMP extends LogKey
case object BATCH_WRITE extends LogKey
case object BIND_ADDRESS extends LogKey
+ case object BLOCK_GENERATOR_STATUS extends LogKey
case object BLOCK_ID extends LogKey
case object BLOCK_IDS extends LogKey
case object BLOCK_MANAGER_ID extends LogKey
@@ -549,7 +550,7 @@ private[spark] object LogKeys {
case object NUM_RULE_OF_RUNS extends LogKey
case object NUM_SEQUENCES extends LogKey
case object NUM_SLOTS extends LogKey
- case object NUM_SPILL_INFOS extends LogKey
+ case object NUM_SPILLS extends LogKey
case object NUM_SPILL_WRITERS extends LogKey
case object NUM_SUB_DIRS extends LogKey
case object NUM_SUCCESSFUL_TASKS extends LogKey
@@ -725,6 +726,7 @@ private[spark] object LogKeys {
case object SHUFFLE_DB_BACKEND_KEY extends LogKey
case object SHUFFLE_DB_BACKEND_NAME extends LogKey
case object SHUFFLE_ID extends LogKey
+ case object SHUFFLE_IDS extends LogKey
case object SHUFFLE_MERGE_ID extends LogKey
case object SHUFFLE_MERGE_RECOVERY_FILE extends LogKey
case object SHUFFLE_SERVICE_CONF_OVERLAY_URL extends LogKey
@@ -755,14 +757,18 @@ private[spark] object LogKeys {
case object STAGE extends LogKey
case object STAGES extends LogKey
case object STAGE_ATTEMPT extends LogKey
+ case object STAGE_ATTEMPT_ID extends LogKey
case object STAGE_ID extends LogKey
case object STAGE_NAME extends LogKey
case object START_INDEX extends LogKey
case object START_TIME extends LogKey
case object STATEMENT_ID extends LogKey
case object STATE_NAME extends LogKey
+ case object STATE_STORE_COORDINATOR extends LogKey
case object STATE_STORE_ID extends LogKey
case object STATE_STORE_PROVIDER extends LogKey
+ case object STATE_STORE_PROVIDER_ID extends LogKey
+ case object STATE_STORE_PROVIDER_IDS extends LogKey
case object STATE_STORE_VERSION extends LogKey
case object STATS extends LogKey
case object STATUS extends LogKey
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index f96513f1b109..de3c41a4b526 100644
---
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -165,7 +165,7 @@ final class ShuffleExternalSorter extends MemoryConsumer
implements ShuffleCheck
MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskContext.taskAttemptId()),
MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()),
MDC.of(LogKeys.MEMORY_SIZE$.MODULE$,
Utils.bytesToString(getMemoryUsage())),
- MDC.of(LogKeys.NUM_SPILL_INFOS$.MODULE$, spills.size()),
+ MDC.of(LogKeys.NUM_SPILLS$.MODULE$, spills.size()),
MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spills.size() != 1 ? "times" :
"time"));
}
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index c8d6000cd628..5b18ab95b07e 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -62,7 +62,7 @@ class BarrierTaskContext private[spark] (
log"for ${MDC(TOTAL_TIME, System.currentTimeMillis() - st)} ms,")
logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, taskAttemptId())}" +
log" from Stage ${MDC(STAGE_ID, stageId())}" +
- log"(Attempt ${MDC(STAGE_ATTEMPT, stageAttemptNumber())}) " +
+ log"(Attempt ${MDC(STAGE_ATTEMPT_ID, stageAttemptNumber())}) " +
msg + waitMsg +
log" current barrier epoch is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e06b7d86e1db..aee92ba928b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1382,9 +1382,9 @@ private[spark] class DAGScheduler(
logInfo(
log"Got job ${MDC(JOB_ID, job.jobId)} (${MDC(CALL_SITE_SHORT_FORM,
callSite.shortForm)}) " +
log"with ${MDC(NUM_PARTITIONS, partitions.length)} output partitions")
- logInfo(log"Final stage: ${MDC(STAGE_ID, finalStage)} " +
+ logInfo(log"Final stage: ${MDC(STAGE, finalStage)} " +
log"(${MDC(STAGE_NAME, finalStage.name)})")
- logInfo(log"Parents of final stage: ${MDC(STAGE_ID, finalStage.parents)}")
+ logInfo(log"Parents of final stage: ${MDC(STAGES, finalStage.parents)}")
logInfo(log"Missing parents: ${MDC(MISSING_PARENT_STAGES,
getMissingParentStages(finalStage))}")
val jobSubmissionTime = clock.getTimeMillis()
@@ -1465,7 +1465,7 @@ private[spark] class DAGScheduler(
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
- logInfo(log"Submitting ${MDC(STAGE_ID, stage)} (${MDC(RDD_ID,
stage.rdd)}), " +
+ logInfo(log"Submitting ${MDC(STAGE, stage)} (${MDC(RDD_ID,
stage.rdd)}), " +
log"which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
@@ -1517,12 +1517,12 @@ private[spark] class DAGScheduler(
val shuffleId = stage.shuffleDep.shuffleId
val shuffleMergeId = stage.shuffleDep.shuffleMergeId
if (stage.shuffleDep.shuffleMergeEnabled) {
- logInfo(log"Shuffle merge enabled before starting the stage for
${MDC(STAGE_ID, stage)}" +
+ logInfo(log"Shuffle merge enabled before starting the stage for
${MDC(STAGE, stage)}" +
log" with shuffle ${MDC(SHUFFLE_ID, shuffleId)} and shuffle merge" +
log" ${MDC(SHUFFLE_MERGE_ID, shuffleMergeId)} with" +
log" ${MDC(NUM_MERGER_LOCATIONS,
stage.shuffleDep.getMergerLocs.size.toString)} merger locations")
} else {
- logInfo(log"Shuffle merge disabled for ${MDC(STAGE_ID, stage)} with " +
+ logInfo(log"Shuffle merge disabled for ${MDC(STAGE, stage)} with " +
log"shuffle ${MDC(SHUFFLE_ID, shuffleId)} and " +
log"shuffle merge ${MDC(SHUFFLE_MERGE_ID, shuffleMergeId)}, " +
log"but can get enabled later adaptively once enough " +
@@ -1583,7 +1583,7 @@ private[spark] class DAGScheduler(
// merger locations but the corresponding shuffle map stage did
not complete
// successfully, we would still enable push for its retry.
s.shuffleDep.setShuffleMergeAllowed(false)
- logInfo(log"Push-based shuffle disabled for ${MDC(STAGE_ID,
stage)} " +
+ logInfo(log"Push-based shuffle disabled for ${MDC(STAGE, stage)} "
+
log"(${MDC(STAGE_NAME, stage.name)}) since it is already shuffle
merge finalized")
}
}
@@ -1707,7 +1707,7 @@ private[spark] class DAGScheduler(
if (tasks.nonEmpty) {
logInfo(log"Submitting ${MDC(NUM_TASKS, tasks.size)} missing tasks from
" +
- log"${MDC(STAGE_ID, stage)} (${MDC(RDD_ID, stage.rdd)}) (first 15
tasks are " +
+ log"${MDC(STAGE, stage)} (${MDC(RDD_ID, stage.rdd)}) (first 15 tasks
are " +
log"for partitions ${MDC(PARTITION_IDS,
tasks.take(15).map(_.partitionId))})")
val shuffleId = stage match {
case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId)
@@ -1964,7 +1964,7 @@ private[spark] class DAGScheduler(
} catch {
case e: UnsupportedOperationException =>
logWarning(log"Could not cancel tasks " +
- log"for stage ${MDC(STAGE_ID, stageId)}", e)
+ log"for stage ${MDC(STAGE, stageId)}", e)
}
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(),
JobSucceeded))
@@ -1996,7 +1996,7 @@ private[spark] class DAGScheduler(
logDebug("ShuffleMapTask finished on " + execId)
if (executorFailureEpoch.contains(execId) &&
smt.epoch <= executorFailureEpoch(execId)) {
- logInfo(log"Ignoring possibly bogus ${MDC(STAGE_ID, smt)}
completion from " +
+ logInfo(log"Ignoring possibly bogus ${MDC(STAGE, smt)}
completion from " +
log"executor ${MDC(EXECUTOR_ID, execId)}")
} else {
// The epoch of the task is acceptable (i.e., the task was
launched after the most
@@ -2026,8 +2026,8 @@ private[spark] class DAGScheduler(
if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) {
logInfo(log"Ignoring fetch failure from " +
log"${MDC(TASK_ID, task)} as it's from " +
- log"${MDC(STAGE_ID, failedStage)} attempt " +
- log"${MDC(STAGE_ATTEMPT, task.stageAttemptId)} and there is a more
recent attempt for " +
+ log"${MDC(FAILED_STAGE, failedStage)} attempt " +
+ log"${MDC(STAGE_ATTEMPT_ID, task.stageAttemptId)} and there is a
more recent attempt for " +
log"that stage (attempt " +
log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())})
running")
} else {
@@ -2035,8 +2035,8 @@ private[spark] class DAGScheduler(
isExecutorDecommissioningOrDecommissioned(taskScheduler, bmAddress)
if (ignoreStageFailure) {
logInfo(log"Ignoring fetch failure from ${MDC(TASK_NAME, task)} of
" +
- log"${MDC(STAGE, failedStage)} attempt " +
- log"${MDC(STAGE_ATTEMPT, task.stageAttemptId)} when count " +
+ log"${MDC(FAILED_STAGE, failedStage)} attempt " +
+ log"${MDC(STAGE_ATTEMPT_ID, task.stageAttemptId)} when count " +
log"${MDC(MAX_ATTEMPTS,
config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key)} " +
log"as executor ${MDC(EXECUTOR_ID, bmAddress.executorId)} is
decommissioned and " +
log"${MDC(CONFIG,
config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key)}=true")
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index df28a97a349e..a769c3fa14b6 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -149,7 +149,7 @@ private[spark] class OutputCommitCoordinator(conf:
SparkConf, isDriver: Boolean)
// The task output has been committed successfully
case _: TaskCommitDenied =>
logInfo(log"Task was denied committing, stage: ${MDC(LogKeys.STAGE_ID,
stage)}." +
- log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)}, " +
+ log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)}, " +
log"partition: ${MDC(LogKeys.PARTITION_ID, partition)}, " +
log"attempt: ${MDC(LogKeys.NUM_ATTEMPT, attemptNumber)}")
case _ =>
@@ -181,7 +181,7 @@ private[spark] class OutputCommitCoordinator(conf:
SparkConf, isDriver: Boolean)
stageStates.get(stage) match {
case Some(state) if attemptFailed(state, stageAttempt, partition,
attemptNumber) =>
logInfo(log"Commit denied for stage=${MDC(LogKeys.STAGE_ID, stage)}." +
- log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)}, partition=" +
+ log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)}, partition=" +
log"${MDC(LogKeys.PARTITION_ID, partition)}: task attempt " +
log"${MDC(LogKeys.NUM_ATTEMPT, attemptNumber)} already marked as
failed.")
false
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8e3cb1379339..43193dc5366a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -325,7 +325,7 @@ private[spark] class TaskSchedulerImpl(
}
tsm.suspend()
logInfo(log"Stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
- log"${MDC(LogKeys.STAGE_ATTEMPT, tsm.taskSet.stageAttemptId)} was
cancelled")
+ log"${MDC(LogKeys.STAGE_ATTEMPT_ID, tsm.taskSet.stageAttemptId)} was
cancelled")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 2474a1342eb2..3513cb1f9376 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
import java.util.Properties
-import org.apache.spark.internal.LogKeys.{STAGE_ATTEMPT, STAGE_ID}
+import org.apache.spark.internal.LogKeys.{STAGE_ATTEMPT_ID, STAGE_ID}
import org.apache.spark.internal.MessageWithContext
/**
@@ -42,7 +42,7 @@ private[spark] class TaskSet(
lazy val logId: MessageWithContext = {
val hashMap = new java.util.HashMap[String, String]()
hashMap.put(STAGE_ID.name, stageId.toString)
- hashMap.put(STAGE_ATTEMPT.name, stageAttemptId.toString)
+ hashMap.put(STAGE_ATTEMPT_ID.name, stageAttemptId.toString)
MessageWithContext(id, hashMap)
}
}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index fdc82285b76b..0eaf138d3eb8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1001,7 +1001,7 @@ private[spark] class TaskSetManager(
logError(
log"Task ${MDC(TASK_INDEX, info.index)}.${MDC(TASK_ATTEMPT_ID,
info.attemptNumber)} " +
log"in stage ${MDC(STAGE_ID, taskSet.stageId)}." +
- log"${MDC(STAGE_ATTEMPT, taskSet.stageAttemptId)} (TID
${MDC(TASK_ID, tid)}) " +
+ log"${MDC(STAGE_ATTEMPT_ID, taskSet.stageAttemptId)} (TID
${MDC(TASK_ID, tid)}) " +
log"can not write to output file: ${MDC(ERROR, ef.description)};
not retrying")
emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index),
reason, null,
accumUpdates, metricPeaks)
diff --git
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index c3d648dccea7..7f2a1a8419a7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -146,6 +146,6 @@ private[spark] abstract class
Spillable[C](taskMemoryManager: TaskMemoryManager)
logInfo(log"Thread ${MDC(LogKeys.THREAD_ID, threadId)} " +
log"spilling in-memory map of ${MDC(LogKeys.BYTE_SIZE,
org.apache.spark.util.Utils.bytesToString(size))} to disk " +
- log"(${MDC(LogKeys.SPILL_TIMES, _spillCount)} times so far)")
+ log"(${MDC(LogKeys.NUM_SPILLS, _spillCount)} times so far)")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index bb7d904402de..1ea4df025467 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -61,7 +61,7 @@ object ShufflePartitionsUtil extends Logging {
val targetSize =
maxTargetSize.min(advisoryTargetSize).max(minPartitionSize)
val shuffleIds =
mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
- logInfo(log"For shuffle(${MDC(LogKeys.SHUFFLE_ID, shuffleIds)}, advisory
target size: " +
+ logInfo(log"For shuffle(${MDC(LogKeys.SHUFFLE_IDS, shuffleIds)}, advisory
target size: " +
log"${MDC(LogKeys.ADVISORY_TARGET_SIZE, advisoryTargetSize)}, actual
target size " +
log"${MDC(LogKeys.TARGET_SIZE, targetSize)}, minimum partition size: " +
log"${MDC(LogKeys.PARTITION_SIZE, minPartitionSize)}")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index bdcf7b8260a7..308b1bceca12 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -480,7 +480,7 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]]
extends Logging with Serial
log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
- log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
+ log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)})")
dataWriter.commit()
} else {
@@ -500,7 +500,8 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]]
extends Logging with Serial
logInfo(log"Committed partition ${MDC(LogKeys.PARTITION_ID, partId)} " +
log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
- log"stage ${MDC(LogKeys.STAGE_ID,
stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
+ log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
+ log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)})")
DataWritingSparkTaskResult(iterWithMetrics.count, msg)
@@ -509,12 +510,14 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]]
extends Logging with Serial
logError(log"Aborting commit for partition ${MDC(LogKeys.PARTITION_ID,
partId)} " +
log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
- log"stage ${MDC(LogKeys.STAGE_ID,
stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
+ log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
+ log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)})")
dataWriter.abort()
logError(log"Aborted commit for partition ${MDC(LogKeys.PARTITION_ID,
partId)} " +
log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
- log"stage ${MDC(LogKeys.STAGE_ID,
stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
+ log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
+ log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)})")
}, finallyBlock = {
dataWriter.close()
})
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 0f945af6ede9..33df8ad42747 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -977,7 +977,8 @@ object StateStore extends Logging {
} finally {
val duration = System.currentTimeMillis() - startTime
val logMsg =
- log"Finished maintenance task for
provider=${MDC(LogKeys.STATE_STORE_PROVIDER, id)}" +
+ log"Finished maintenance task for " +
+ log"provider=${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}" +
log" in elapsed_time=${MDC(LogKeys.TIME_UNITS, duration)}\n"
if (duration > 5000) {
logInfo(logMsg)
@@ -1007,9 +1008,9 @@ object StateStore extends Logging {
.map(_.reportActiveInstance(storeProviderId, host, executorId,
otherProviderIds))
.getOrElse(Seq.empty[StateStoreProviderId])
logInfo(log"Reported that the loaded instance " +
- log"${MDC(LogKeys.STATE_STORE_PROVIDER, storeProviderId)} is active")
+ log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, storeProviderId)} is
active")
logDebug(log"The loaded instances are going to unload: " +
- log"${MDC(LogKeys.STATE_STORE_PROVIDER,
providerIdsToUnload.mkString(", "))}")
+ log"${MDC(LogKeys.STATE_STORE_PROVIDER_IDS, providerIdsToUnload)}")
providerIdsToUnload
} else {
Seq.empty[StateStoreProviderId]
@@ -1041,7 +1042,7 @@ object StateStore extends Logging {
_coordRef = StateStoreCoordinatorRef.forExecutor(env)
}
logInfo(log"Retrieved reference to StateStoreCoordinator: " +
- log"${MDC(LogKeys.STATE_STORE_PROVIDER, _coordRef)}")
+ log"${MDC(LogKeys.STATE_STORE_COORDINATOR, _coordRef)}")
Some(_coordRef)
} else {
_coordRef = null
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index e0e85712a230..fae68123773d 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -142,7 +142,7 @@ private[streaming] class BlockGenerator(
state = StoppedAddingData
} else {
logWarning(log"Cannot stop BlockGenerator as its not in the Active
state " +
- log"[state = ${MDC(STATUS, state)}]")
+ log"[state = ${MDC(BLOCK_GENERATOR_STATUS, state)}]")
return
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]