This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d47f34f3182c [SPARK-48629] Migrate the residual code to structured 
logging framework
d47f34f3182c is described below

commit d47f34f3182c45f80ae3f364bcc30f06d4838d43
Author: panbingkun <[email protected]>
AuthorDate: Mon Jun 24 22:46:25 2024 -0700

    [SPARK-48629] Migrate the residual code to structured logging framework
    
    ### What changes were proposed in this pull request?
    The pr aims to migrate the `residual code` to structured logging framework.
    
    ### Why are the changes needed?
    When I reviewed the spark code, I found that some logs in the some module 
were not fully migrated to the structured logging framework, let's to complete 
unfinished work.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46986 from panbingkun/sl_other_followup.
    
    Authored-by: panbingkun <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../scala/org/apache/spark/internal/LogKey.scala   | 19 +++++++++---
 .../scala/org/apache/spark/util/MavenUtils.scala   |  9 +++---
 .../org/apache/spark/util/SparkErrorUtils.scala    |  5 +--
 .../org/apache/spark/util/SparkFileUtils.scala     |  6 ++--
 .../sql/streaming/StreamingQueryListenerBus.scala  |  6 ++--
 ...SparkConnectStreamingQueryListenerHandler.scala | 36 +++++++++++-----------
 .../streaming/kinesis/KinesisCheckpointer.scala    |  2 +-
 .../spark/deploy/history/FsHistoryProvider.scala   |  4 +--
 .../spark/deploy/master/ui/MasterWebUI.scala       |  4 +--
 .../scala/org/apache/spark/executor/Executor.scala |  2 +-
 ...AbortableStreamBasedCheckpointFileManager.scala |  6 ++--
 .../apache/spark/deploy/k8s/KubernetesConf.scala   | 15 ++++-----
 .../spark/deploy/yarn/ApplicationMaster.scala      |  9 +++---
 .../org/apache/spark/deploy/yarn/Client.scala      |  4 +--
 .../scheduler/cluster/YarnSchedulerBackend.scala   |  5 +--
 .../sql/execution/streaming/AsyncCommitLog.scala   |  4 ++-
 .../execution/streaming/AsyncOffsetSeqLog.scala    |  4 ++-
 .../AsyncProgressTrackingMicroBatchExecution.scala | 10 +++---
 .../execution/streaming/MicroBatchExecution.scala  | 12 ++++----
 .../sql/execution/streaming/StreamExecution.scala  | 10 +++---
 .../sql/execution/streaming/StreamMetadata.scala   |  7 +++--
 .../streaming/state/OperatorStateMetadata.scala    |  5 +--
 .../streaming/state/RocksDBFileManager.scala       |  2 +-
 .../state/StateSchemaCompatibilityChecker.scala    |  6 ++--
 .../streaming/state/StateStoreChangelog.scala      |  8 ++---
 .../hive/service/cli/operation/Operation.java      |  2 +-
 .../thriftserver/SparkGetFunctionsOperation.scala  | 12 +++++---
 .../rdd/WriteAheadLogBackedBlockRDD.scala          |  5 +--
 28 files changed, 118 insertions(+), 101 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index f90eb4a77071..c0edebf30c9a 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -80,6 +80,7 @@ private[spark] object LogKeys {
   case object APP_STATE extends LogKey
   case object ARCHIVE_NAME extends LogKey
   case object ARGS extends LogKey
+  case object ARTIFACTS extends LogKey
   case object ARTIFACT_ID extends LogKey
   case object ATTRIBUTE_MAP extends LogKey
   case object AUTH_ENABLED extends LogKey
@@ -282,6 +283,7 @@ private[spark] object LogKeys {
   case object FREE_MEMORY_SIZE extends LogKey
   case object FROM_OFFSET extends LogKey
   case object FROM_TIME extends LogKey
+  case object FS_DATA_OUTPUT_STREAM extends LogKey
   case object FUNCTION_NAME extends LogKey
   case object FUNCTION_PARAM extends LogKey
   case object GLOBAL_INIT_FILE extends LogKey
@@ -299,9 +301,8 @@ private[spark] object LogKeys {
   case object HIVE_OPERATION_STATE extends LogKey
   case object HIVE_OPERATION_TYPE extends LogKey
   case object HOST extends LogKey
+  case object HOSTS extends LogKey
   case object HOST_LOCAL_BLOCKS_SIZE extends LogKey
-  case object HOST_NAME extends LogKey
-  case object HOST_NAMES extends LogKey
   case object HOST_PORT extends LogKey
   case object HOST_PORT2 extends LogKey
   case object HUGE_METHOD_LIMIT extends LogKey
@@ -337,6 +338,7 @@ private[spark] object LogKeys {
   case object KEY2 extends LogKey
   case object KEYTAB extends LogKey
   case object KEYTAB_FILE extends LogKey
+  case object KILL_EXECUTORS extends LogKey
   case object LABEL_COLUMN extends LogKey
   case object LARGEST_CLUSTER_INDEX extends LogKey
   case object LAST_ACCESS_TIME extends LogKey
@@ -357,10 +359,10 @@ private[spark] object LogKeys {
   case object LOCAL_BLOCKS_SIZE extends LogKey
   case object LOCAL_SCRATCH_DIR extends LogKey
   case object LOCATION extends LogKey
+  case object LOGICAL_PLAN extends LogKey
   case object LOGICAL_PLAN_COLUMNS extends LogKey
   case object LOGICAL_PLAN_LEAVES extends LogKey
   case object LOG_ID extends LogKey
-  case object LOG_KEY_FILE extends LogKey
   case object LOG_LEVEL extends LogKey
   case object LOG_OFFSET extends LogKey
   case object LOG_TYPE extends LogKey
@@ -385,6 +387,7 @@ private[spark] object LogKeys {
   case object MAX_NUM_PARTITIONS extends LogKey
   case object MAX_NUM_POSSIBLE_BINS extends LogKey
   case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
+  case object MAX_SERVICE_NAME_LENGTH extends LogKey
   case object MAX_SIZE extends LogKey
   case object MAX_SLOTS extends LogKey
   case object MAX_SPLIT_BYTES extends LogKey
@@ -395,6 +398,7 @@ private[spark] object LogKeys {
   case object MEMORY_THRESHOLD_SIZE extends LogKey
   case object MERGE_DIR_NAME extends LogKey
   case object MESSAGE extends LogKey
+  case object METADATA extends LogKey
   case object METADATA_DIRECTORY extends LogKey
   case object METADATA_JSON extends LogKey
   case object META_FILE extends LogKey
@@ -541,7 +545,8 @@ private[spark] object LogKeys {
   case object OLD_VALUE extends LogKey
   case object OPEN_COST_IN_BYTES extends LogKey
   case object OPERATION_HANDLE extends LogKey
-  case object OPERATION_HANDLE_IDENTIFIER extends LogKey
+  case object OPERATION_HANDLE_ID extends LogKey
+  case object OPERATION_ID extends LogKey
   case object OPTIMIZED_PLAN_COLUMNS extends LogKey
   case object OPTIMIZER_CLASS_NAME extends LogKey
   case object OPTIONS extends LogKey
@@ -583,6 +588,7 @@ private[spark] object LogKeys {
   case object POST_SCAN_FILTERS extends LogKey
   case object PREDICATE extends LogKey
   case object PREDICATES extends LogKey
+  case object PREFERRED_SERVICE_NAME extends LogKey
   case object PREFIX extends LogKey
   case object PRETTY_ID_STRING extends LogKey
   case object PRINCIPAL extends LogKey
@@ -613,6 +619,7 @@ private[spark] object LogKeys {
   case object RANGE extends LogKey
   case object RATE_LIMIT extends LogKey
   case object RATIO extends LogKey
+  case object RDD extends LogKey
   case object RDD_CHECKPOINT_DIR extends LogKey
   case object RDD_DEBUG_STRING extends LogKey
   case object RDD_DESCRIPTION extends LogKey
@@ -641,6 +648,7 @@ private[spark] object LogKeys {
   case object REMOVE_FROM_MASTER extends LogKey
   case object REPORT_DETAILS extends LogKey
   case object REQUESTER_SIZE extends LogKey
+  case object REQUEST_EXECUTORS extends LogKey
   case object REQUEST_ID extends LogKey
   case object RESOURCE extends LogKey
   case object RESOURCE_NAME extends LogKey
@@ -679,6 +687,7 @@ private[spark] object LogKeys {
   case object SESSION_KEY extends LogKey
   case object SET_CLIENT_INFO_REQUEST extends LogKey
   case object SHARD_ID extends LogKey
+  case object SHORTER_SERVICE_NAME extends LogKey
   case object SHORT_USER_NAME extends LogKey
   case object SHUFFLE_BLOCK_INFO extends LogKey
   case object SHUFFLE_DB_BACKEND_KEY extends LogKey
@@ -756,7 +765,6 @@ private[spark] object LogKeys {
   case object TASK_ATTEMPT_ID extends LogKey
   case object TASK_ID extends LogKey
   case object TASK_INDEX extends LogKey
-  case object TASK_INFO_ID extends LogKey
   case object TASK_LOCALITY extends LogKey
   case object TASK_NAME extends LogKey
   case object TASK_REQUIREMENTS extends LogKey
@@ -835,6 +843,7 @@ private[spark] object LogKeys {
   case object WORKER_PORT extends LogKey
   case object WORKER_URL extends LogKey
   case object WRITE_AHEAD_LOG_INFO extends LogKey
+  case object WRITE_AHEAD_LOG_RECORD_HANDLE extends LogKey
   case object WRITE_JOB_UUID extends LogKey
   case object XML_SCHEDULING_MODE extends LogKey
   case object XSD_PATH extends LogKey
diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
index ae00987cd69f..546981c8b543 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
@@ -36,7 +36,7 @@ import org.apache.ivy.plugins.repository.file.FileRepository
 import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, 
IBiblioResolver}
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.util.ArrayImplicits._
 
 /** Provides utility functions to be used inside SparkSubmit. */
@@ -215,7 +215,7 @@ private[spark] object MavenUtils extends Logging {
         if (artifactInfo.getExt == "jar") {
           true
         } else {
-          logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
+          logInfo(log"Skipping non-jar dependency ${MDC(LogKeys.ARTIFACT_ID, 
artifactInfo.getId)}")
           false
         }
       }
@@ -515,8 +515,9 @@ private[spark] object MavenUtils extends Logging {
           val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, 
true)
           if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
             val failedArtifacts = failedReports.map(r => r.getArtifact)
-            logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", 
"]")}, " +
-              s"attempt to retry while skipping local-m2-cache.")
+            logInfo(log"Download failed: " +
+              log"${MDC(LogKeys.ARTIFACTS, failedArtifacts.mkString("[", ", ", 
"]"))}, " +
+              log"attempt to retry while skipping local-m2-cache.")
             failedArtifacts.foreach(artifact => {
               clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, 
ivySettings.getDefaultCache)
             })
diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
index 8194d1e42417..9f604e4bf47f 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets.UTF_8
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 
 private[spark] trait SparkErrorUtils extends Logging {
   /**
@@ -74,7 +74,8 @@ private[spark] trait SparkErrorUtils extends Logging {
       } catch {
         case t: Throwable if (originalThrowable != null && originalThrowable 
!= t) =>
           originalThrowable.addSuppressed(t)
-          logWarning(s"Suppressing exception in finally: ${t.getMessage}", t)
+          logWarning(
+            log"Suppressing exception in finally: ${MDC(LogKeys.MESSAGE, 
t.getMessage)}", t)
           throw originalThrowable
       }
     }
diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala
index e12f8acdadd3..22f03df1b269 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala
@@ -20,7 +20,7 @@ import java.io.File
 import java.net.{URI, URISyntaxException}
 import java.nio.file.Files
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.network.util.JavaUtils
 
 private[spark] trait SparkFileUtils extends Logging {
@@ -77,12 +77,12 @@ private[spark] trait SparkFileUtils extends Logging {
       // remove the check when we're sure that Files.createDirectories() would 
never fail silently.
       Files.createDirectories(dir.toPath)
       if ( !dir.exists() || !dir.isDirectory) {
-        logError(s"Failed to create directory " + dir)
+        logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}")
       }
       dir.isDirectory
     } catch {
       case e: Exception =>
-        logError(s"Failed to create directory " + dir, e)
+        logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}", e)
         false
     }
   }
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
index c3c23740e2fe..c2934bcfa705 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
@@ -145,13 +145,11 @@ class StreamingQueryListenerBus(sparkSession: 
SparkSession) extends Logging {
           case t: QueryTerminatedEvent =>
             listener.onQueryTerminated(t)
           case _ =>
-            logWarning(
-              log"Unknown StreamingQueryListener event: " +
-                log"${MDC(LogKeys.EVENT, event)}")
+            logWarning(log"Unknown StreamingQueryListener event: 
${MDC(LogKeys.EVENT, event)}")
         }
       } catch {
         case e: Exception =>
-          logWarning(s"Listener $listener threw an exception", e)
+          logWarning(log"Listener ${MDC(LogKeys.LISTENER, listener)} threw an 
exception", e)
       })
   }
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
index d072b56e022a..ce5aa0888ca5 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
@@ -57,9 +57,10 @@ class 
SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
       case 
StreamingQueryListenerBusCommand.CommandCase.ADD_LISTENER_BUS_LISTENER =>
         listenerHolder.isServerSideListenerRegistered match {
           case true =>
-            logWarning(
-              s"[SessionId: $sessionId][UserId: $userId][operationId: " +
-                s"${executeHolder.operationId}] Redundant server side listener 
added. Exiting.")
+            logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" 
+
+              log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+              log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, 
executeHolder.operationId)}] " +
+              log"Redundant server side listener added. Exiting.")
             return
           case false =>
             // This transfers sending back the response to the client until
@@ -86,36 +87,35 @@ class 
SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
                   log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
                     log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
                     log"[operationId: " +
-                    log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, 
executeHolder.operationId)}] " +
+                    log"${MDC(LogKeys.OPERATION_HANDLE_ID, 
executeHolder.operationId)}] " +
                     log"Error sending listener added response.",
                   e)
                 listenerHolder.cleanUp()
                 return
             }
         }
-        logInfo(
-          log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
-            log"${MDC(LogKeys.USER_ID, userId)}][operationId: " +
-            log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, 
executeHolder.operationId)}] " +
-            log"Server side listener added. Now blocking until all client side 
listeners are " +
-            log"removed or there is error transmitting the event back.")
+        logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
+          log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+          log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, 
executeHolder.operationId)}] " +
+          log"Server side listener added. Now blocking until " +
+          log"all client side listeners are removed or there is error 
transmitting the event back.")
         // Block the handling thread, and have serverListener continuously 
send back new events
         listenerHolder.streamingQueryListenerLatch.await()
         logInfo(
-          log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
-            log"${MDC(LogKeys.USER_ID, userId)}]" +
-            log"[operationId: " +
-            log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, 
executeHolder.operationId)}] " +
+          log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
+            log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+            log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, 
executeHolder.operationId)}] " +
             log"Server side listener long-running handling thread ended.")
       case 
StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
         listenerHolder.isServerSideListenerRegistered match {
           case true =>
             sessionHolder.streamingServersideListenerHolder.cleanUp()
           case false =>
-            logWarning(
-              s"[SessionId: $sessionId][UserId: $userId][operationId: " +
-                s"${executeHolder.operationId}] No active server side listener 
bus listener " +
-                s"but received remove listener call. Exiting.")
+            logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" 
+
+              log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+              log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, 
executeHolder.operationId)}] " +
+              log"No active server side listener bus listener but received 
remove listener call. " +
+              log"Exiting.")
             return
         }
       case StreamingQueryListenerBusCommand.CommandCase.COMMAND_NOT_SET =>
diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
index 7e5a170a870c..c52eeca1e48a 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
@@ -103,7 +103,7 @@ private[kinesis] class KinesisCheckpointer(
       }
     } catch {
       case NonFatal(e) =>
-        logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e)
+        logWarning(log"Failed to checkpoint shardId ${MDC(SHARD_ID, shardId)} 
to DynamoDB.", e)
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 92ac10ac9fb8..95b23c0f894f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -1390,8 +1390,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         case _: IOException if !retried =>
           // compaction may touch the file(s) which app rebuild wants to read
           // compaction wouldn't run in short interval, so try again...
-          logWarning(s"Exception occurred while rebuilding log path 
${attempt.logPath} - " +
-            "trying again...")
+          logWarning(log"Exception occurred while rebuilding log path " +
+            log"${MDC(LogKeys.PATH, attempt.logPath)} - trying again...")
           retried = true
 
         case e: Exception =>
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 88cda1bd65cc..6c7a8f582d91 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, Maste
 import org.apache.spark.deploy.Utils.addRenderLogHandler
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{HOST_NAMES, NUM_REMOVED_WORKERS}
+import org.apache.spark.internal.LogKeys.{HOSTS, NUM_REMOVED_WORKERS}
 import org.apache.spark.internal.config.DECOMMISSION_ENABLED
 import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE
 import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
@@ -79,7 +79,7 @@ class MasterWebUI(
           } else {
             val removedWorkers = masterEndpointRef.askSync[Integer](
               DecommissionWorkersOnHosts(hostnames))
-            logInfo(log"Decommissioning of hosts ${MDC(HOST_NAMES, 
hostnames)}" +
+            logInfo(log"Decommissioning of hosts ${MDC(HOSTS, hostnames)}" +
               log" decommissioned ${MDC(NUM_REMOVED_WORKERS, removedWorkers)} 
workers")
             if (removedWorkers > 0) {
               resp.setStatus(HttpServletResponse.SC_OK)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 7317d3c47c08..586a8a7db28a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -83,7 +83,7 @@ private[spark] class Executor(
   extends Logging {
 
   logInfo(log"Starting executor ID ${LogMDC(LogKeys.EXECUTOR_ID, executorId)}" 
+
-    log" on host ${LogMDC(HOST_NAME, executorHostname)}")
+    log" on host ${LogMDC(HOST, executorHostname)}")
   logInfo(log"OS info ${LogMDC(OS_NAME, System.getProperty("os.name"))}," +
     log" ${LogMDC(OS_VERSION, System.getProperty("os.version"))}, " +
     log"${LogMDC(OS_ARCH, System.getProperty("os.arch"))}")
diff --git 
a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
 
b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
index 20b3d9444884..599361009fcc 100644
--- 
a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
+++ 
b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
@@ -53,7 +53,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, 
hadoopConf: Configur
         fsDataOutputStream.close()
       } catch {
           case NonFatal(e) =>
-            logWarning(s"Error cancelling write to $path (stream: 
$fsDataOutputStream)", e)
+            logWarning(log"Error cancelling write to ${MDC(LogKeys.PATH, 
path)} " +
+              log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM, 
fsDataOutputStream)})", e)
       } finally {
         terminated = true
       }
@@ -71,7 +72,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, 
hadoopConf: Configur
         fsDataOutputStream.close()
       } catch {
           case NonFatal(e) =>
-            logWarning(s"Error closing $path (stream: $fsDataOutputStream)", e)
+            logWarning(log"Error closing ${MDC(LogKeys.PATH, path)} " +
+              log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM, 
fsDataOutputStream)})", e)
       } finally {
         terminated = true
       }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index f62204a8a9c0..deb178eb90e1 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -27,8 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._
 import org.apache.spark.deploy.k8s.submit._
-import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{CONFIG, EXECUTOR_ENV_REGEX}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -107,9 +106,11 @@ class KubernetesDriverConf(
     } else {
       val randomServiceId = KubernetesUtils.uniqueID(clock)
       val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
-      logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
-        s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling 
back to use " +
-        s"$shorterServiceName as the driver service's name.")
+      logWarning(log"Driver's hostname would preferably be " +
+        log"${MDC(LogKeys.PREFERRED_SERVICE_NAME, preferredServiceName)}, but 
this is too long " +
+        log"(must be <= ${MDC(LogKeys.MAX_SERVICE_NAME_LENGTH, 
MAX_SERVICE_NAME_LENGTH)} " +
+        log"characters). Falling back to use " +
+        log"${MDC(LogKeys.SHORTER_SERVICE_NAME, shorterServiceName)} as the 
driver service's name.")
       shorterServiceName
     }
   }
@@ -242,10 +243,10 @@ private[spark] class KubernetesExecutorConf(
     if (executorEnvRegex.pattern.matcher(key).matches()) {
       true
     } else {
-      logWarning(log"Invalid key: ${MDC(CONFIG, key)}, " +
+      logWarning(log"Invalid key: ${MDC(LogKeys.CONFIG, key)}, " +
         log"a valid environment variable name must consist of alphabetic 
characters, " +
         log"digits, '_', '-', or '.', and must not start with a digit. " +
-        log"Regex used for validation is '${MDC(EXECUTOR_ENV_REGEX, 
executorEnvRegex)}'")
+        log"Regex used for validation is '${MDC(LogKeys.EXECUTOR_ENV_REGEX, 
executorEnvRegex)}'")
       false
     }
   }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2523941e0bff..11d22a3225d8 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -527,10 +527,9 @@ private[spark] class ApplicationMaster(
       userClassThread.join()
     } catch {
       case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
-        logError(
-          log"""SparkContext did not initialize after waiting for
-          |${MDC(LogKeys.TIMEOUT, totalWaitTime)} ms.
-          | Please check earlier log output for errors. Failing the 
application.""".stripMargin)
+        logError(log"SparkContext did not initialize after waiting for " +
+            log"${MDC(LogKeys.TIMEOUT, totalWaitTime)} ms. " +
+            log"Please check earlier log output for errors. Failing the 
application.")
         finish(FinalApplicationStatus.FAILED,
           ApplicationMaster.EXIT_SC_NOT_INITED,
           "Timed out waiting for SparkContext.")
@@ -869,7 +868,7 @@ private[spark] class ApplicationMaster(
           }
         } else {
           logError(log"Application Master lost connection with driver! 
Shutting down. " +
-            log"${MDC(LogKeys.REMOTE_ADDRESS, remoteAddress)}")
+            log"${MDC(LogKeys.HOST_PORT, remoteAddress)}")
           finish(FinalApplicationStatus.FAILED, 
ApplicationMaster.EXIT_DISCONNECTED)
         }
       }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index bf31e03ba9a8..b2c4d97bc7b0 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1211,11 +1211,11 @@ private[spark] class Client(
             cleanupStagingDir()
             return YarnAppReport(YarnApplicationState.KILLED, 
FinalApplicationStatus.KILLED, None)
           case NonFatal(e) if !e.isInstanceOf[InterruptedIOException] =>
-            val msg = s"Failed to contact YARN for application $appId."
+            val msg = log"Failed to contact YARN for application 
${MDC(LogKeys.APP_ID, appId)}."
             logError(msg, e)
             // Don't necessarily clean up staging dir because status is unknown
             return YarnAppReport(YarnApplicationState.FAILED, 
FinalApplicationStatus.FAILED,
-              Some(msg))
+              Some(msg.message))
         }
       val state = report.getYarnApplicationState
       reportsSinceLastLog += 1
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e21f1c3268ae..cd81f11510fe 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -366,7 +366,8 @@ private[spark] abstract class YarnSchedulerBackend(
             am.ask[Boolean](r).andThen {
               case Success(b) => context.reply(b)
               case Failure(NonFatal(e)) =>
-                logError(s"Sending $r to AM was unsuccessful", e)
+                logError(
+                  log"Sending ${MDC(LogKeys.REQUEST_EXECUTORS, r)} to AM was 
unsuccessful", e)
                 context.sendFailure(e)
             }(ThreadUtils.sameThread)
           case None =>
@@ -380,7 +381,7 @@ private[spark] abstract class YarnSchedulerBackend(
             am.ask[Boolean](k).andThen {
               case Success(b) => context.reply(b)
               case Failure(NonFatal(e)) =>
-                logError(s"Sending $k to AM was unsuccessful", e)
+                logError(log"Sending ${MDC(LogKeys.KILL_EXECUTORS, k)} to AM 
was unsuccessful", e)
                 context.sendFailure(e)
             }(ThreadUtils.sameThread)
           case None =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
index 686e0bb86886..6db01624fd26 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{CompletableFuture, 
ConcurrentLinkedDeque, ThreadPoo
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.internal.{LogKeys, MDC}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
 
@@ -125,7 +126,8 @@ class AsyncCommitLog(sparkSession: SparkSession, path: 
String, executorService:
             }
           } catch {
             case e: Throwable =>
-              logError(s"Encountered error while writing batch ${batchId} to 
commit log", e)
+              logError(log"Encountered error while writing batch " +
+                log"${MDC(LogKeys.BATCH_ID, batchId)} to commit log", e)
               future.completeExceptionally(e)
           }
         }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
index a89a9132e03e..54a8855b77cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.internal.{LogKeys, MDC}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.util.{Clock, SystemClock}
@@ -159,7 +160,8 @@ class AsyncOffsetSeqLog(
             }
           } catch {
             case e: Throwable =>
-              logError(s"Encountered error while writing batch ${batchId} to 
offset log", e)
+              logError(log"Encountered error while writing batch " +
+                log"${MDC(LogKeys.BATCH_ID, batchId)} to offset log", e)
               future.completeExceptionally(e)
           }
         }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
index ce11e9100782..4a7cb5b71a77 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicLong
 
-import org.apache.spark.internal.LogKeys.PRETTY_ID_STRING
+import org.apache.spark.internal.LogKeys.{BATCH_ID, PRETTY_ID_STRING}
 import org.apache.spark.internal.MDC
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.streaming.WriteToStream
@@ -158,8 +158,8 @@ class AsyncProgressTrackingMicroBatchExecution(
         }
       })
       .exceptionally((th: Throwable) => {
-        logError(s"Encountered error while performing" +
-          s" async offset write for batch ${execCtx.batchId}", th)
+        logError(log"Encountered error while performing async offset write for 
batch " +
+          log"${MDC(BATCH_ID, execCtx.batchId)}", th)
         errorNotifier.markError(th)
         return
       })
@@ -190,8 +190,8 @@ class AsyncProgressTrackingMicroBatchExecution(
         commitLog
           .addAsync(execCtx.batchId, 
CommitMetadata(watermarkTracker.currentWatermark))
           .exceptionally((th: Throwable) => {
-            logError(s"Got exception during async write to commit log" +
-              s" for batch ${execCtx.batchId}", th)
+            logError(log"Got exception during async write to commit log for 
batch " +
+              log"${MDC(BATCH_ID, execCtx.batchId)}", th)
             errorNotifier.markError(th)
             return
           })
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index ef49de5e0857..f636413f7c51 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -419,12 +419,12 @@ class MicroBatchExecution(
   def validateOffsetLogAndGetPrevOffset(latestBatchId: Long): 
Option[OffsetSeq] = {
     if (latestBatchId != 0) {
       Some(offsetLog.get(latestBatchId - 1).getOrElse {
-        logError(s"The offset log for batch ${latestBatchId - 1} doesn't 
exist, " +
-          s"which is required to restart the query from the latest batch 
$latestBatchId " +
-          "from the offset log. Please ensure there are two subsequent offset 
logs " +
-          "available for the latest batch via manually deleting the offset 
file(s). " +
-          "Please also ensure the latest batch for commit log is equal or one 
batch " +
-          "earlier than the latest batch for offset log.")
+        logError(log"The offset log for batch ${MDC(LogKeys.BATCH_ID, 
latestBatchId - 1)} " +
+          log"doesn't exist, which is required to restart the query from the 
latest batch " +
+          log"${MDC(LogKeys.LATEST_BATCH_ID, latestBatchId)} from the offset 
log. Please ensure " +
+          log"there are two subsequent offset logs available for the latest 
batch via manually " +
+          log"deleting the offset file(s). Please also ensure the latest batch 
for commit log is " +
+          log"equal or one batch earlier than the latest batch for offset 
log.")
         throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't 
exist")
       })
     } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4198d7367fe2..a6320a664aa1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -32,8 +32,8 @@ import 
com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, 
SparkThrowable}
-import org.apache.spark.internal.{Logging, LogKeys, MDC}
-import org.apache.spark.internal.LogKeys.{CHECKPOINT_PATH, CHECKPOINT_ROOT, 
PATH, PRETTY_ID_STRING, SPARK_DATA_STREAM}
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{CHECKPOINT_PATH, CHECKPOINT_ROOT, 
LOGICAL_PLAN, PATH, PRETTY_ID_STRING, SPARK_DATA_STREAM}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
@@ -322,8 +322,7 @@ abstract class StreamExecution(
         if (state.compareAndSet(INITIALIZING, ACTIVE)) {
           // Log logical plan at the start of the query to help debug issues 
related to
           // plan changes.
-          logInfo(log"Finish initializing with logical plan:\n" +
-            log"${MDC(LogKeys.QUERY_PLAN, logicalPlan)}")
+          logInfo(log"Finish initializing with logical 
plan:\n${MDC(LOGICAL_PLAN, logicalPlan)}")
 
           // Unblock `awaitInitialization`
           initializationLatch.countDown()
@@ -373,8 +372,7 @@ abstract class StreamExecution(
           case _ => None
         }
 
-        logError(log"Query ${MDC(LogKeys.PRETTY_ID_STRING, prettyIdString)} " +
-          log"terminated with error", e)
+        logError(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} terminated 
with error", e)
         getLatestExecutionContext().updateStatusMessage(s"Terminated with 
exception: $message")
         // Rethrow the fatal errors to allow the user using 
`Thread.UncaughtExceptionHandler` to
         // handle them
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index 978cb3c34f60..84519150ca42 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, 
FSDataInputStream, Path
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 
@@ -60,7 +60,7 @@ object StreamMetadata extends Logging {
         Some(metadata)
       } catch {
         case NonFatal(e) =>
-          logError(s"Error reading stream metadata from $metadataFile", e)
+          logError(log"Error reading stream metadata from ${MDC(LogKeys.PATH, 
metadataFile)}", e)
           throw e
       } finally {
         IOUtils.closeQuietly(input)
@@ -91,7 +91,8 @@ object StreamMetadata extends Logging {
         if (output != null) {
           output.cancel()
         }
-        logError(s"Error writing stream metadata $metadata to $metadataFile", 
e)
+        logError(log"Error writing stream metadata ${MDC(LogKeys.METADATA, 
metadata)} to " +
+          log"${MDC(LogKeys.PATH, metadataFile)}", e)
         throw e
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index b58c805af9d6..8ce883038401 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FSDataOutputStream, Path}
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, 
MetadataVersionUtil}
 
 /**
@@ -105,7 +105,8 @@ class OperatorStateMetadataWriter(stateCheckpointPath: 
Path, hadoopConf: Configu
       outputStream.close()
     } catch {
       case e: Throwable =>
-        logError(s"Fail to write state metadata file to $metadataFilePath", e)
+        logError(
+          log"Fail to write state metadata file to ${MDC(LogKeys.META_FILE, 
metadataFilePath)}", e)
         outputStream.cancel()
         throw e
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 0a460ece2400..b2b0ef8ce712 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -682,7 +682,7 @@ class RocksDBFileManager(
       case e: Exception =>
         // Cancel the actual output stream first, so that zout.close() does 
not write the file
         out.cancel()
-        logError(s"Error zipping to $filesStr", e)
+        logError(log"Error zipping to ${MDC(LogKeys.FILE_NAME, filesStr)}", e)
         throw e
     } finally {
       // Close everything no matter what happened
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index bc3e4b7e6dbe..374ab52b9110 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
 import 
org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, 
SchemaWriter}
 import org.apache.spark.sql.types.{DataType, StructType}
@@ -75,7 +75,7 @@ class StateSchemaCompatibilityChecker(
       schemaReader.read(inStream)
     } catch {
       case e: Throwable =>
-        logError(s"Fail to read schema file from $schemaFileLocation", e)
+        logError(log"Fail to read schema file from ${MDC(LogKeys.PATH, 
schemaFileLocation)}", e)
         throw e
     } finally {
       inStream.close()
@@ -97,7 +97,7 @@ class StateSchemaCompatibilityChecker(
       outStream.close()
     } catch {
       case e: Throwable =>
-        logError(s"Fail to write schema file to $schemaFileLocation", e)
+        logError(log"Fail to write schema file to ${MDC(LogKeys.PATH, 
schemaFileLocation)}", e)
         outStream.cancel()
         throw e
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 20df67b25bfe..b1860be41ac4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -25,7 +25,7 @@ import com.google.common.io.ByteStreams
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.{FSError, Path}
 
-import org.apache.spark.internal.{Logging, LogKeys, MDC}
+import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -176,8 +176,7 @@ class StateStoreChangelogWriterV1(
     } catch {
       case e: Throwable =>
         abort()
-        logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME, 
file)} " +
-          log"because of exception ${MDC(LogKeys.EXCEPTION, e)}")
+        logError(log"Fail to commit changelog file ${MDC(PATH, file)} because 
of exception", e)
         throw e
     } finally {
       backingFileStream = null
@@ -256,8 +255,7 @@ class StateStoreChangelogWriterV2(
     } catch {
       case e: Throwable =>
         abort()
-        logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME, 
file)} " +
-          log"because of exception ${MDC(LogKeys.EXCEPTION, e)}")
+        logError(log"Fail to commit changelog file ${MDC(PATH, file)} because 
of exception", e)
         throw e
     } finally {
       backingFileStream = null
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
index f0c1985ce58a..f488a411c31f 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
@@ -290,7 +290,7 @@ public abstract class Operation {
       if (operationLog == null) {
         LOG.error("Operation [ {} ] logging is enabled, " +
           "but its OperationLog object cannot be found.",
-          MDC.of(LogKeys.OPERATION_HANDLE_IDENTIFIER$.MODULE$, 
opHandle.getHandleIdentifier()));
+          MDC.of(LogKeys.OPERATION_HANDLE_ID$.MODULE$, 
opHandle.getHandleIdentifier()));
       } else {
         operationLog.close();
       }
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
index 9cf31d99ccfa..b060bf3d4ec8 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
@@ -51,9 +51,11 @@ private[hive] class SparkGetFunctionsOperation(
 
   override def runInternal(): Unit = {
     // Do not change cmdStr. It's used for Hive auditing and authorization.
-    val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
-    val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'"
-    logInfo(log"${MDC(LogKeys.MESSAGE, logMsg)} with 
${MDC(LogKeys.STATEMENT_ID, statementId)}")
+    val cmdMDC = log"catalog : ${MDC(LogKeys.CATALOG_NAME, catalogName)}, " +
+      log"schemaPattern : ${MDC(LogKeys.DATABASE_NAME, schemaName)}"
+    val logMDC = log"Listing functions '" + cmdMDC +
+      log", functionName : ${MDC(LogKeys.FUNCTION_NAME, functionName)}'"
+    logInfo(logMDC + log" with ${MDC(LogKeys.STATEMENT_ID, statementId)}")
     setState(OperationState.RUNNING)
     // Always use the latest class loader provided by executionHive's state.
     val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
@@ -69,13 +71,13 @@ private[hive] class SparkGetFunctionsOperation(
       // authorize this call on the schema objects
       val privObjs =
         HivePrivilegeObjectUtils.getHivePrivDbObjects(matchingDbs.asJava)
-      authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr)
+      authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, 
cmdMDC.message)
     }
 
     HiveThriftServer2.eventManager.onStatementStart(
       statementId,
       parentSession.getSessionHandle.getSessionId.toString,
-      logMsg,
+      logMDC.message,
       statementId,
       parentSession.getUsername)
 
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 9c461f0d4270..12c6c95f7d8d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
 import org.apache.spark._
+import org.apache.spark.internal.{LogKeys, MDC}
 import org.apache.spark.rdd.BlockRDD
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.streaming.util._
@@ -156,8 +157,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
           s"Could not read data from write ahead log record 
${partition.walRecordHandle}, " +
             s"read returned null")
       }
-      logInfo(s"Read partition data of $this from write ahead log, record 
handle " +
-        partition.walRecordHandle)
+      logInfo(log"Read partition data of ${MDC(LogKeys.RDD, this)} from write 
ahead log, " +
+        log"record handle ${MDC(LogKeys.WRITE_AHEAD_LOG_RECORD_HANDLE, 
partition.walRecordHandle)}")
       if (storeInBlockManager) {
         blockManager.putBytes(blockId, new 
ChunkedByteBuffer(dataRead.duplicate()), storageLevel)
         logDebug(s"Stored partition data of $this into block manager with 
level $storageLevel")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to