This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ab5461cf54b0 [SPARK-50076][FOLLOW-UP] Fix multiple log key names
ab5461cf54b0 is described below

commit ab5461cf54b01d091e8e48aef1c08a2b77b05dea
Author: Michael Zhang <[email protected]>
AuthorDate: Tue Jan 7 14:46:38 2025 -0800

    [SPARK-50076][FOLLOW-UP] Fix multiple log key names
    
    ### What changes were proposed in this pull request?
    
    Adds more logkeys to be more specific about values being logged. Also fixes 
incorrect logkey usages.
    
    ### Why are the changes needed?
    
    To make structured logging more useful.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests pass. Changes are logging only.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #49387 from michaelzhan-db/SPARK-50076-pt2.
    
    Lead-authored-by: Michael Zhang <[email protected]>
    Co-authored-by: Sample User <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  8 ++++++-
 .../spark/shuffle/sort/ShuffleExternalSorter.java  |  2 +-
 .../org/apache/spark/BarrierTaskContext.scala      |  2 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 26 +++++++++++-----------
 .../spark/scheduler/OutputCommitCoordinator.scala  |  4 ++--
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../scala/org/apache/spark/scheduler/TaskSet.scala |  4 ++--
 .../apache/spark/scheduler/TaskSetManager.scala    |  2 +-
 .../apache/spark/util/collection/Spillable.scala   |  2 +-
 .../execution/adaptive/ShufflePartitionsUtil.scala |  2 +-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 11 +++++----
 .../sql/execution/streaming/state/StateStore.scala |  9 ++++----
 .../spark/streaming/receiver/BlockGenerator.scala  |  2 +-
 13 files changed, 43 insertions(+), 33 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 9e681448aaaf..c3a1af68d1c8 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -94,6 +94,7 @@ private[spark] object LogKeys {
   case object BATCH_TIMESTAMP extends LogKey
   case object BATCH_WRITE extends LogKey
   case object BIND_ADDRESS extends LogKey
+  case object BLOCK_GENERATOR_STATUS extends LogKey
   case object BLOCK_ID extends LogKey
   case object BLOCK_IDS extends LogKey
   case object BLOCK_MANAGER_ID extends LogKey
@@ -549,7 +550,7 @@ private[spark] object LogKeys {
   case object NUM_RULE_OF_RUNS extends LogKey
   case object NUM_SEQUENCES extends LogKey
   case object NUM_SLOTS extends LogKey
-  case object NUM_SPILL_INFOS extends LogKey
+  case object NUM_SPILLS extends LogKey
   case object NUM_SPILL_WRITERS extends LogKey
   case object NUM_SUB_DIRS extends LogKey
   case object NUM_SUCCESSFUL_TASKS extends LogKey
@@ -725,6 +726,7 @@ private[spark] object LogKeys {
   case object SHUFFLE_DB_BACKEND_KEY extends LogKey
   case object SHUFFLE_DB_BACKEND_NAME extends LogKey
   case object SHUFFLE_ID extends LogKey
+  case object SHUFFLE_IDS extends LogKey
   case object SHUFFLE_MERGE_ID extends LogKey
   case object SHUFFLE_MERGE_RECOVERY_FILE extends LogKey
   case object SHUFFLE_SERVICE_CONF_OVERLAY_URL extends LogKey
@@ -755,14 +757,18 @@ private[spark] object LogKeys {
   case object STAGE extends LogKey
   case object STAGES extends LogKey
   case object STAGE_ATTEMPT extends LogKey
+  case object STAGE_ATTEMPT_ID extends LogKey
   case object STAGE_ID extends LogKey
   case object STAGE_NAME extends LogKey
   case object START_INDEX extends LogKey
   case object START_TIME extends LogKey
   case object STATEMENT_ID extends LogKey
   case object STATE_NAME extends LogKey
+  case object STATE_STORE_COORDINATOR extends LogKey
   case object STATE_STORE_ID extends LogKey
   case object STATE_STORE_PROVIDER extends LogKey
+  case object STATE_STORE_PROVIDER_ID extends LogKey
+  case object STATE_STORE_PROVIDER_IDS extends LogKey
   case object STATE_STORE_VERSION extends LogKey
   case object STATS extends LogKey
   case object STATUS extends LogKey
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index f96513f1b109..de3c41a4b526 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -165,7 +165,7 @@ final class ShuffleExternalSorter extends MemoryConsumer 
implements ShuffleCheck
         MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskContext.taskAttemptId()),
         MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()),
         MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, 
Utils.bytesToString(getMemoryUsage())),
-        MDC.of(LogKeys.NUM_SPILL_INFOS$.MODULE$, spills.size()),
+        MDC.of(LogKeys.NUM_SPILLS$.MODULE$, spills.size()),
         MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spills.size() != 1 ? "times" : 
"time"));
     }
 
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index c8d6000cd628..5b18ab95b07e 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -62,7 +62,7 @@ class BarrierTaskContext private[spark] (
       log"for ${MDC(TOTAL_TIME, System.currentTimeMillis() - st)} ms,")
     logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, taskAttemptId())}" +
       log" from Stage ${MDC(STAGE_ID, stageId())}" +
-      log"(Attempt ${MDC(STAGE_ATTEMPT, stageAttemptNumber())}) " +
+      log"(Attempt ${MDC(STAGE_ATTEMPT_ID, stageAttemptNumber())}) " +
       msg + waitMsg +
       log" current barrier epoch is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e06b7d86e1db..aee92ba928b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1382,9 +1382,9 @@ private[spark] class DAGScheduler(
     logInfo(
       log"Got job ${MDC(JOB_ID, job.jobId)} (${MDC(CALL_SITE_SHORT_FORM, 
callSite.shortForm)}) " +
       log"with ${MDC(NUM_PARTITIONS, partitions.length)} output partitions")
-    logInfo(log"Final stage: ${MDC(STAGE_ID, finalStage)} " +
+    logInfo(log"Final stage: ${MDC(STAGE, finalStage)} " +
       log"(${MDC(STAGE_NAME, finalStage.name)})")
-    logInfo(log"Parents of final stage: ${MDC(STAGE_ID, finalStage.parents)}")
+    logInfo(log"Parents of final stage: ${MDC(STAGES, finalStage.parents)}")
     logInfo(log"Missing parents: ${MDC(MISSING_PARENT_STAGES, 
getMissingParentStages(finalStage))}")
 
     val jobSubmissionTime = clock.getTimeMillis()
@@ -1465,7 +1465,7 @@ private[spark] class DAGScheduler(
           val missing = getMissingParentStages(stage).sortBy(_.id)
           logDebug("missing: " + missing)
           if (missing.isEmpty) {
-            logInfo(log"Submitting ${MDC(STAGE_ID, stage)} (${MDC(RDD_ID, 
stage.rdd)}), " +
+            logInfo(log"Submitting ${MDC(STAGE, stage)} (${MDC(RDD_ID, 
stage.rdd)}), " +
                     log"which has no missing parents")
             submitMissingTasks(stage, jobId.get)
           } else {
@@ -1517,12 +1517,12 @@ private[spark] class DAGScheduler(
     val shuffleId = stage.shuffleDep.shuffleId
     val shuffleMergeId = stage.shuffleDep.shuffleMergeId
     if (stage.shuffleDep.shuffleMergeEnabled) {
-      logInfo(log"Shuffle merge enabled before starting the stage for 
${MDC(STAGE_ID, stage)}" +
+      logInfo(log"Shuffle merge enabled before starting the stage for 
${MDC(STAGE, stage)}" +
         log" with shuffle ${MDC(SHUFFLE_ID, shuffleId)} and shuffle merge" +
         log" ${MDC(SHUFFLE_MERGE_ID, shuffleMergeId)} with" +
         log" ${MDC(NUM_MERGER_LOCATIONS, 
stage.shuffleDep.getMergerLocs.size.toString)} merger locations")
     } else {
-      logInfo(log"Shuffle merge disabled for ${MDC(STAGE_ID, stage)} with " +
+      logInfo(log"Shuffle merge disabled for ${MDC(STAGE, stage)} with " +
         log"shuffle ${MDC(SHUFFLE_ID, shuffleId)} and " +
         log"shuffle merge ${MDC(SHUFFLE_MERGE_ID, shuffleMergeId)}, " +
         log"but can get enabled later adaptively once enough " +
@@ -1583,7 +1583,7 @@ private[spark] class DAGScheduler(
             // merger locations but the corresponding shuffle map stage did 
not complete
             // successfully, we would still enable push for its retry.
             s.shuffleDep.setShuffleMergeAllowed(false)
-            logInfo(log"Push-based shuffle disabled for ${MDC(STAGE_ID, 
stage)} " +
+            logInfo(log"Push-based shuffle disabled for ${MDC(STAGE, stage)} " 
+
               log"(${MDC(STAGE_NAME, stage.name)}) since it is already shuffle 
merge finalized")
           }
         }
@@ -1707,7 +1707,7 @@ private[spark] class DAGScheduler(
 
     if (tasks.nonEmpty) {
       logInfo(log"Submitting ${MDC(NUM_TASKS, tasks.size)} missing tasks from 
" +
-        log"${MDC(STAGE_ID, stage)} (${MDC(RDD_ID, stage.rdd)}) (first 15 
tasks are " +
+        log"${MDC(STAGE, stage)} (${MDC(RDD_ID, stage.rdd)}) (first 15 tasks 
are " +
         log"for partitions ${MDC(PARTITION_IDS, 
tasks.take(15).map(_.partitionId))})")
       val shuffleId = stage match {
         case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId)
@@ -1964,7 +1964,7 @@ private[spark] class DAGScheduler(
                     } catch {
                       case e: UnsupportedOperationException =>
                         logWarning(log"Could not cancel tasks " +
-                          log"for stage ${MDC(STAGE_ID, stageId)}", e)
+                          log"for stage ${MDC(STAGE, stageId)}", e)
                     }
                     listenerBus.post(
                       SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
JobSucceeded))
@@ -1996,7 +1996,7 @@ private[spark] class DAGScheduler(
               logDebug("ShuffleMapTask finished on " + execId)
               if (executorFailureEpoch.contains(execId) &&
                 smt.epoch <= executorFailureEpoch(execId)) {
-                logInfo(log"Ignoring possibly bogus ${MDC(STAGE_ID, smt)} 
completion from " +
+                logInfo(log"Ignoring possibly bogus ${MDC(STAGE, smt)} 
completion from " +
                   log"executor ${MDC(EXECUTOR_ID, execId)}")
               } else {
                 // The epoch of the task is acceptable (i.e., the task was 
launched after the most
@@ -2026,8 +2026,8 @@ private[spark] class DAGScheduler(
         if (failedStage.latestInfo.attemptNumber() != task.stageAttemptId) {
           logInfo(log"Ignoring fetch failure from " +
             log"${MDC(TASK_ID, task)} as it's from " +
-            log"${MDC(STAGE_ID, failedStage)} attempt " +
-            log"${MDC(STAGE_ATTEMPT, task.stageAttemptId)} and there is a more 
recent attempt for " +
+            log"${MDC(FAILED_STAGE, failedStage)} attempt " +
+            log"${MDC(STAGE_ATTEMPT_ID, task.stageAttemptId)} and there is a 
more recent attempt for " +
             log"that stage (attempt " +
             log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())}) 
running")
         } else {
@@ -2035,8 +2035,8 @@ private[spark] class DAGScheduler(
             isExecutorDecommissioningOrDecommissioned(taskScheduler, bmAddress)
           if (ignoreStageFailure) {
             logInfo(log"Ignoring fetch failure from ${MDC(TASK_NAME, task)} of 
" +
-              log"${MDC(STAGE, failedStage)} attempt " +
-              log"${MDC(STAGE_ATTEMPT, task.stageAttemptId)} when count " +
+              log"${MDC(FAILED_STAGE, failedStage)} attempt " +
+              log"${MDC(STAGE_ATTEMPT_ID, task.stageAttemptId)} when count " +
               log"${MDC(MAX_ATTEMPTS, 
config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key)} " +
               log"as executor ${MDC(EXECUTOR_ID, bmAddress.executorId)} is 
decommissioned and " +
               log"${MDC(CONFIG, 
config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key)}=true")
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index df28a97a349e..a769c3fa14b6 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -149,7 +149,7 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
       // The task output has been committed successfully
       case _: TaskCommitDenied =>
         logInfo(log"Task was denied committing, stage: ${MDC(LogKeys.STAGE_ID, 
stage)}." +
-          log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)}, " +
+          log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)}, " +
           log"partition: ${MDC(LogKeys.PARTITION_ID, partition)}, " +
           log"attempt: ${MDC(LogKeys.NUM_ATTEMPT, attemptNumber)}")
       case _ =>
@@ -181,7 +181,7 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
     stageStates.get(stage) match {
       case Some(state) if attemptFailed(state, stageAttempt, partition, 
attemptNumber) =>
         logInfo(log"Commit denied for stage=${MDC(LogKeys.STAGE_ID, stage)}." +
-          log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)}, partition=" +
+          log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)}, partition=" +
           log"${MDC(LogKeys.PARTITION_ID, partition)}: task attempt " +
           log"${MDC(LogKeys.NUM_ATTEMPT, attemptNumber)} already marked as 
failed.")
         false
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8e3cb1379339..43193dc5366a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -325,7 +325,7 @@ private[spark] class TaskSchedulerImpl(
         }
         tsm.suspend()
         logInfo(log"Stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
-          log"${MDC(LogKeys.STAGE_ATTEMPT, tsm.taskSet.stageAttemptId)} was 
cancelled")
+          log"${MDC(LogKeys.STAGE_ATTEMPT_ID, tsm.taskSet.stageAttemptId)} was 
cancelled")
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 2474a1342eb2..3513cb1f9376 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.util.Properties
 
-import org.apache.spark.internal.LogKeys.{STAGE_ATTEMPT, STAGE_ID}
+import org.apache.spark.internal.LogKeys.{STAGE_ATTEMPT_ID, STAGE_ID}
 import org.apache.spark.internal.MessageWithContext
 
 /**
@@ -42,7 +42,7 @@ private[spark] class TaskSet(
   lazy val logId: MessageWithContext = {
     val hashMap = new java.util.HashMap[String, String]()
     hashMap.put(STAGE_ID.name, stageId.toString)
-    hashMap.put(STAGE_ATTEMPT.name, stageAttemptId.toString)
+    hashMap.put(STAGE_ATTEMPT_ID.name, stageAttemptId.toString)
     MessageWithContext(id, hashMap)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index fdc82285b76b..0eaf138d3eb8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1001,7 +1001,7 @@ private[spark] class TaskSetManager(
           logError(
             log"Task ${MDC(TASK_INDEX, info.index)}.${MDC(TASK_ATTEMPT_ID, 
info.attemptNumber)} " +
               log"in stage ${MDC(STAGE_ID, taskSet.stageId)}." +
-            log"${MDC(STAGE_ATTEMPT, taskSet.stageAttemptId)} (TID 
${MDC(TASK_ID, tid)}) " +
+            log"${MDC(STAGE_ATTEMPT_ID, taskSet.stageAttemptId)} (TID 
${MDC(TASK_ID, tid)}) " +
             log"can not write to output file: ${MDC(ERROR, ef.description)}; 
not retrying")
           emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), 
reason, null,
             accumUpdates, metricPeaks)
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index c3d648dccea7..7f2a1a8419a7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -146,6 +146,6 @@ private[spark] abstract class 
Spillable[C](taskMemoryManager: TaskMemoryManager)
     logInfo(log"Thread ${MDC(LogKeys.THREAD_ID, threadId)} " +
       log"spilling in-memory map of ${MDC(LogKeys.BYTE_SIZE,
         org.apache.spark.util.Utils.bytesToString(size))} to disk " +
-      log"(${MDC(LogKeys.SPILL_TIMES, _spillCount)} times so far)")
+      log"(${MDC(LogKeys.NUM_SPILLS, _spillCount)} times so far)")
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index bb7d904402de..1ea4df025467 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -61,7 +61,7 @@ object ShufflePartitionsUtil extends Logging {
     val targetSize = 
maxTargetSize.min(advisoryTargetSize).max(minPartitionSize)
 
     val shuffleIds = 
mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
-    logInfo(log"For shuffle(${MDC(LogKeys.SHUFFLE_ID, shuffleIds)}, advisory 
target size: " +
+    logInfo(log"For shuffle(${MDC(LogKeys.SHUFFLE_IDS, shuffleIds)}, advisory 
target size: " +
       log"${MDC(LogKeys.ADVISORY_TARGET_SIZE, advisoryTargetSize)}, actual 
target size " +
       log"${MDC(LogKeys.TARGET_SIZE, targetSize)}, minimum partition size: " +
       log"${MDC(LogKeys.PARTITION_SIZE, minPartitionSize)}")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index bdcf7b8260a7..308b1bceca12 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -480,7 +480,7 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] 
extends Logging with Serial
             log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
             log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
             log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
-            log"${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
+            log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)})")
 
           dataWriter.commit()
         } else {
@@ -500,7 +500,8 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] 
extends Logging with Serial
       logInfo(log"Committed partition ${MDC(LogKeys.PARTITION_ID, partId)} " +
         log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
         log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
-        log"stage ${MDC(LogKeys.STAGE_ID, 
stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
+        log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
+        log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)})")
 
       DataWritingSparkTaskResult(iterWithMetrics.count, msg)
 
@@ -509,12 +510,14 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] 
extends Logging with Serial
       logError(log"Aborting commit for partition ${MDC(LogKeys.PARTITION_ID, 
partId)} " +
         log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
         log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
-        log"stage ${MDC(LogKeys.STAGE_ID, 
stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
+        log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
+        log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)})")
       dataWriter.abort()
       logError(log"Aborted commit for partition ${MDC(LogKeys.PARTITION_ID, 
partId)} " +
         log"(task ${MDC(LogKeys.TASK_ID, taskId)}, " +
         log"attempt ${MDC(LogKeys.TASK_ATTEMPT_ID, attemptId)}, " +
-        log"stage ${MDC(LogKeys.STAGE_ID, 
stageId)}.${MDC(LogKeys.STAGE_ATTEMPT, stageAttempt)})")
+        log"stage ${MDC(LogKeys.STAGE_ID, stageId)}." +
+        log"${MDC(LogKeys.STAGE_ATTEMPT_ID, stageAttempt)})")
     }, finallyBlock = {
       dataWriter.close()
     })
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 0f945af6ede9..33df8ad42747 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -977,7 +977,8 @@ object StateStore extends Logging {
           } finally {
             val duration = System.currentTimeMillis() - startTime
             val logMsg =
-              log"Finished maintenance task for 
provider=${MDC(LogKeys.STATE_STORE_PROVIDER, id)}" +
+              log"Finished maintenance task for " +
+                log"provider=${MDC(LogKeys.STATE_STORE_PROVIDER_ID, id)}" +
                 log" in elapsed_time=${MDC(LogKeys.TIME_UNITS, duration)}\n"
             if (duration > 5000) {
               logInfo(logMsg)
@@ -1007,9 +1008,9 @@ object StateStore extends Logging {
         .map(_.reportActiveInstance(storeProviderId, host, executorId, 
otherProviderIds))
         .getOrElse(Seq.empty[StateStoreProviderId])
       logInfo(log"Reported that the loaded instance " +
-        log"${MDC(LogKeys.STATE_STORE_PROVIDER, storeProviderId)} is active")
+        log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, storeProviderId)} is 
active")
       logDebug(log"The loaded instances are going to unload: " +
-        log"${MDC(LogKeys.STATE_STORE_PROVIDER, 
providerIdsToUnload.mkString(", "))}")
+        log"${MDC(LogKeys.STATE_STORE_PROVIDER_IDS, providerIdsToUnload)}")
       providerIdsToUnload
     } else {
       Seq.empty[StateStoreProviderId]
@@ -1041,7 +1042,7 @@ object StateStore extends Logging {
         _coordRef = StateStoreCoordinatorRef.forExecutor(env)
       }
       logInfo(log"Retrieved reference to StateStoreCoordinator: " +
-        log"${MDC(LogKeys.STATE_STORE_PROVIDER, _coordRef)}")
+        log"${MDC(LogKeys.STATE_STORE_COORDINATOR, _coordRef)}")
       Some(_coordRef)
     } else {
       _coordRef = null
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index e0e85712a230..fae68123773d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -142,7 +142,7 @@ private[streaming] class BlockGenerator(
         state = StoppedAddingData
       } else {
         logWarning(log"Cannot stop BlockGenerator as its not in the Active 
state " +
-          log"[state = ${MDC(STATUS, state)}]")
+          log"[state = ${MDC(BLOCK_GENERATOR_STATUS, state)}]")
         return
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to