This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4957a40d6e6b [SPARK-47584][SQL] SQL core: Migrate logWarn with 
variables to structured logging framework
4957a40d6e6b is described below

commit 4957a40d6e6bf68226c8047687e8f30c93adb8ce
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Wed Apr 17 11:59:09 2024 -0700

    [SPARK-47584][SQL] SQL core: Migrate logWarn with variables to structured 
logging framework
    
    ### What changes were proposed in this pull request?
    The pr aims to migrate `logWarning` in module `SQL core` with variables to 
`structured logging framework`.
    
    ### Why are the changes needed?
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46057 from panbingkun/SPARK-47584.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   | 65 +++++++++++++++++++++-
 .../catalyst/analysis/StreamingJoinHelper.scala    |  4 +-
 .../ReplaceNullWithFalseInPredicate.scala          |  4 +-
 .../main/scala/org/apache/spark/sql/Column.scala   | 13 +++--
 .../scala/org/apache/spark/sql/SparkSession.scala  | 20 ++++---
 .../spark/sql/api/python/PythonSQLUtils.scala      |  7 ++-
 .../org/apache/spark/sql/api/r/SQLUtils.scala      |  9 +--
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  9 ++-
 .../apache/spark/sql/execution/ExistingRDD.scala   | 12 ++--
 .../spark/sql/execution/QueryExecution.scala       |  6 +-
 .../spark/sql/execution/SparkStrategies.scala      | 10 ++--
 .../sql/execution/WholeStageCodegenExec.scala      |  8 ++-
 .../adaptive/InsertAdaptiveSparkPlan.scala         |  6 +-
 .../execution/command/AnalyzeTablesCommand.scala   |  6 +-
 .../spark/sql/execution/command/CommandUtils.scala |  9 +--
 .../spark/sql/execution/command/SetCommand.scala   | 28 ++++++----
 .../apache/spark/sql/execution/command/ddl.scala   |  8 ++-
 .../datasources/BasicWriteStatsTracker.scala       |  9 +--
 .../sql/execution/datasources/DataSource.scala     | 10 ++--
 .../execution/datasources/DataSourceManager.scala  |  6 +-
 .../sql/execution/datasources/FilePartition.scala  | 11 ++--
 .../sql/execution/datasources/FileScanRDD.scala    |  8 ++-
 .../execution/datasources/FileStatusCache.scala    | 14 +++--
 .../execution/datasources/csv/CSVDataSource.scala  |  9 +--
 .../execution/datasources/jdbc/JDBCRelation.scala  | 15 ++---
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 11 ++--
 .../datasources/json/JsonOutputWriter.scala        |  8 ++-
 .../sql/execution/datasources/orc/OrcUtils.scala   |  5 +-
 .../datasources/parquet/ParquetFileFormat.scala    | 13 +++--
 .../datasources/parquet/ParquetUtils.scala         |  9 +--
 .../execution/datasources/v2/CacheTableExec.scala  |  4 +-
 .../execution/datasources/v2/CreateIndexExec.scala |  5 +-
 .../datasources/v2/CreateNamespaceExec.scala       |  5 +-
 .../execution/datasources/v2/CreateTableExec.scala |  5 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |  5 +-
 .../execution/datasources/v2/DropIndexExec.scala   |  4 +-
 .../datasources/v2/FilePartitionReader.scala       |  7 ++-
 .../sql/execution/datasources/v2/FileScan.scala    |  7 ++-
 .../v2/V2ScanPartitioningAndOrdering.scala         |  8 ++-
 .../ApplyInPandasWithStatePythonRunner.scala       |  8 ++-
 .../python/AttachDistributedSequenceExec.scala     |  5 +-
 .../streaming/AvailableNowDataStreamWrapper.scala  | 15 ++---
 .../streaming/CheckpointFileManager.scala          | 24 ++++----
 .../streaming/CompactibleFileStreamLog.scala       |  5 +-
 .../sql/execution/streaming/FileStreamSink.scala   |  9 +--
 .../sql/execution/streaming/FileStreamSource.scala | 16 ++++--
 .../execution/streaming/IncrementalExecution.scala |  9 +--
 .../streaming/ManifestFileCommitProtocol.scala     |  6 +-
 .../execution/streaming/MicroBatchExecution.scala  | 24 ++++----
 .../spark/sql/execution/streaming/OffsetSeq.scala  | 15 +++--
 .../sql/execution/streaming/ProgressReporter.scala | 19 ++++---
 .../execution/streaming/ResolveWriteToStream.scala | 15 +++--
 .../sql/execution/streaming/StreamExecution.scala  |  8 ++-
 .../sql/execution/streaming/TimerStateImpl.scala   | 11 ++--
 .../sql/execution/streaming/TriggerExecutor.scala  |  8 ++-
 .../continuous/ContinuousTextSocketSource.scala    |  5 +-
 .../sources/TextSocketMicroBatchStream.scala       |  5 +-
 .../state/HDFSBackedStateStoreProvider.scala       | 25 +++++----
 .../sql/execution/streaming/state/RocksDB.scala    |  6 +-
 .../streaming/state/RocksDBFileManager.scala       | 14 +++--
 .../sql/execution/streaming/state/StateStore.scala | 12 ++--
 .../state/SymmetricHashJoinStateManager.scala      |  9 +--
 .../sql/execution/ui/SQLAppStatusListener.scala    |  9 +--
 .../apache/spark/sql/internal/SharedState.scala    |  8 ++-
 .../apache/spark/sql/jdbc/PostgresDialect.scala    |  5 +-
 .../sql/streaming/StreamingQueryManager.scala      |  7 ++-
 66 files changed, 445 insertions(+), 259 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 838ef0355e3a..46e993410ffc 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -22,6 +22,8 @@ package org.apache.spark.internal
  */
 object LogKey extends Enumeration {
   val ACCUMULATOR_ID = Value
+  val ACTUAL_NUM_FILES = Value
+  val ACTUAL_PARTITION_COLUMN = Value
   val ANALYSIS_ERROR = Value
   val APP_DESC = Value
   val APP_ID = Value
@@ -32,15 +34,18 @@ object LogKey extends Enumeration {
   val BROADCAST_ID = Value
   val BUCKET = Value
   val BYTECODE_SIZE = Value
+  val CACHED_TABLE_PARTITION_METADATA_SIZE = Value
   val CACHE_AUTO_REMOVED_SIZE = Value
   val CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value
   val CACHE_UNTIL_LAST_PRODUCED_SIZE = Value
+  val CALL_SITE_LONG_FORM = Value
   val CATEGORICAL_FEATURES = Value
   val CLASS_LOADER = Value
   val CLASS_NAME = Value
   val CLUSTER_ID = Value
   val CODEC_LEVEL = Value
   val CODEC_NAME = Value
+  val CODEGEN_STAGE_ID = Value
   val COLUMN_DATA_TYPE_SOURCE = Value
   val COLUMN_DATA_TYPE_TARGET = Value
   val COLUMN_DEFAULT_VALUE = Value
@@ -63,14 +68,25 @@ object LogKey extends Enumeration {
   val CSV_SCHEMA_FIELD_NAME = Value
   val CSV_SCHEMA_FIELD_NAMES = Value
   val CSV_SOURCE = Value
+  val CURRENT_PATH = Value
   val DATA = Value
   val DATABASE_NAME = Value
   val DATAFRAME_CACHE_ENTRY = Value
   val DATAFRAME_ID = Value
+  val DATA_SOURCE = Value
+  val DATA_SOURCES = Value
+  val DATA_SOURCE_PROVIDER = Value
+  val DEFAULT_ISOLATION_LEVEL = Value
+  val DEFAULT_VALUE = Value
+  val DELEGATE = Value
   val DESCRIPTION = Value
+  val DESIRED_PARTITIONS_SIZE = Value
   val DRIVER_ID = Value
   val DROPPED_PARTITIONS = Value
   val DURATION = Value
+  val ELAPSED_TIME = Value
+  val ENCODING = Value
+  val END_INDEX = Value
   val END_POINT = Value
   val ENGINE = Value
   val ERROR = Value
@@ -78,22 +94,31 @@ object LogKey extends Enumeration {
   val EVENT_QUEUE = Value
   val EXECUTE_INFO = Value
   val EXECUTE_KEY = Value
+  val EXECUTION_PLAN_LEAVES = Value
   val EXECUTOR_ENV_REGEX = Value
   val EXECUTOR_ID = Value
   val EXECUTOR_IDS = Value
   val EXECUTOR_STATE = Value
   val EXIT_CODE = Value
-  val EXPRESSION_TERMS = Value
+  val EXPECTED_NUM_FILES = Value
+  val EXPECTED_PARTITION_COLUMN = Value
+  val EXPIRY_TIMESTAMP = Value
+  val EXPR = Value
+  val EXPR_TERMS = Value
+  val EXTENDED_EXPLAIN_GENERATOR = Value
   val FAILURES = Value
   val FALLBACK_VERSION = Value
   val FIELD_NAME = Value
   val FILE_FORMAT = Value
   val FILE_FORMAT2 = Value
+  val FILE_VERSION = Value
+  val FINISH_TRIGGER_DURATION = Value
   val FROM_OFFSET = Value
   val FUNCTION_NAME = Value
   val FUNCTION_PARAMETER = Value
   val GROUP_ID = Value
   val HADOOP_VERSION = Value
+  val HASH_JOIN_KEYS = Value
   val HISTORY_DIR = Value
   val HIVE_CLIENT_VERSION = Value
   val HIVE_METASTORE_VERSION = Value
@@ -103,22 +128,31 @@ object LogKey extends Enumeration {
   val HOST_PORT = Value
   val INCOMPATIBLE_TYPES = Value
   val INDEX = Value
+  val INDEX_NAME = Value
   val INFERENCE_MODE = Value
   val INITIAL_CAPACITY = Value
   val INTERVAL = Value
+  val ISOLATION_LEVEL = Value
   val JOB_ID = Value
   val JOIN_CONDITION = Value
-  val JOIN_CONDITION_SUB_EXPRESSION = Value
+  val JOIN_CONDITION_SUB_EXPR = Value
   val KAFKA_PULLS_COUNT = Value
   val KAFKA_RECORDS_PULLED_COUNT = Value
   val KEY = Value
   val LAST_ACCESS_TIME = Value
+  val LATEST_BATCH_ID = Value
+  val LATEST_COMMITTED_BATCH_ID = Value
   val LEARNING_RATE = Value
+  val LEFT_EXPR = Value
   val LINE = Value
   val LINE_NUM = Value
   val LISTENER = Value
   val LOAD_FACTOR = Value
+  val LOAD_TIME = Value
+  val LOGICAL_PLAN_COLUMNS = Value
+  val LOGICAL_PLAN_LEAVES = Value
   val LOG_TYPE = Value
+  val LOWER_BOUND = Value
   val MASTER_URL = Value
   val MAX_ATTEMPTS = Value
   val MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value
@@ -126,23 +160,33 @@ object LogKey extends Enumeration {
   val MAX_CAPACITY = Value
   val MAX_CATEGORIES = Value
   val MAX_EXECUTOR_FAILURES = Value
+  val MAX_FILE_VERSION = Value
+  val MAX_PARTITIONS_SIZE = Value
   val MAX_SIZE = Value
+  val MAX_TABLE_PARTITION_METADATA_SIZE = Value
   val MERGE_DIR_NAME = Value
   val MESSAGE = Value
   val METHOD_NAME = Value
   val MIN_SIZE = Value
+  val NAMESPACE = Value
+  val NEW_PATH = Value
   val NEW_VALUE = Value
   val NUM_COLUMNS = Value
+  val NUM_FILES = Value
   val NUM_ITERATIONS = Value
   val OBJECT_ID = Value
   val OFFSET = Value
   val OFFSETS = Value
   val OLD_BLOCK_MANAGER_ID = Value
   val OLD_VALUE = Value
+  val OPTIMIZED_PLAN_COLUMNS = Value
   val OPTIMIZER_CLASS_NAME = Value
+  val OPTIONS = Value
   val OP_ID = Value
   val OP_TYPE = Value
   val PARSE_MODE = Value
+  val PARTITIONED_FILE_READER = Value
+  val PARTITIONS_SIZE = Value
   val PARTITION_ID = Value
   val PARTITION_SPECIFICATION = Value
   val PARTITION_SPECS = Value
@@ -154,6 +198,7 @@ object LogKey extends Enumeration {
   val POD_PHASE = Value
   val POLICY = Value
   val PORT = Value
+  val PROCESSING_TIME = Value
   val PRODUCER_ID = Value
   val PROVIDER = Value
   val QUERY_CACHE_VALUE = Value
@@ -163,8 +208,10 @@ object LogKey extends Enumeration {
   val QUERY_PLAN_COMPARISON = Value
   val QUERY_PLAN_LENGTH_ACTUAL = Value
   val QUERY_PLAN_LENGTH_MAX = Value
+  val QUERY_RUN_ID = Value
   val RANGE = Value
   val RDD_ID = Value
+  val READ_LIMIT = Value
   val REASON = Value
   val REATTACHABLE = Value
   val RECEIVED_BLOCK_INFO = Value
@@ -174,6 +221,7 @@ object LogKey extends Enumeration {
   val RESOURCE_NAME = Value
   val RETRY_COUNT = Value
   val RETRY_INTERVAL = Value
+  val RIGHT_EXPR = Value
   val RULE_BATCH_NAME = Value
   val RULE_NAME = Value
   val RULE_NUMBER_OF_RUNS = Value
@@ -190,20 +238,27 @@ object LogKey extends Enumeration {
   val SHUFFLE_MERGE_ID = Value
   val SIZE = Value
   val SLEEP_TIME = Value
+  val SPARK_DATA_STREAM = Value
+  val SPARK_PLAN_ID = Value
   val SQL_TEXT = Value
   val STAGE_ID = Value
+  val START_INDEX = Value
   val STATEMENT_ID = Value
+  val STATE_STORE_PROVIDER = Value
   val STATUS = Value
+  val STORE_ID = Value
   val STREAM_ID = Value
   val STREAM_NAME = Value
   val SUBMISSION_ID = Value
   val SUBSAMPLING_RATE = Value
+  val SUB_QUERY = Value
   val TABLE_NAME = Value
   val TASK_ATTEMPT_ID = Value
   val TASK_ID = Value
   val TASK_NAME = Value
   val TASK_SET_NAME = Value
   val TASK_STATE = Value
+  val TEMP_PATH = Value
   val THREAD = Value
   val THREAD_NAME = Value
   val TID = Value
@@ -221,12 +276,16 @@ object LogKey extends Enumeration {
   val TOTAL_SIZE = Value
   val TOTAL_TIME = Value
   val TOTAL_TIME_READ = Value
-  val UNSUPPORTED_EXPRESSION = Value
+  val TREE_NODE = Value
+  val TRIGGER_INTERVAL = Value
+  val UNSUPPORTED_EXPR = Value
   val UNSUPPORTED_HINT_REASON = Value
   val UNTIL_OFFSET = Value
+  val UPPER_BOUND = Value
   val URI = Value
   val USER_ID = Value
   val USER_NAME = Value
+  val VALUE = Value
   val WAIT_RESULT_TIME = Value
   val WAIT_SEND_TIME = Value
   val WAIT_TIME = Value
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
index e9c4dd0be7d9..ee285f7ca860 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
@@ -168,7 +168,7 @@ object StreamingJoinHelper extends PredicateHelper with 
Logging {
     if (constraintTerms.size > 1) {
       logWarning(
         log"Failed to extract state constraint terms: multiple time terms in 
condition\n\t" +
-          log"${MDC(EXPRESSION_TERMS, terms.mkString("\n\t"))}")
+          log"${MDC(EXPR_TERMS, terms.mkString("\n\t"))}")
       return None
     }
     if (constraintTerms.isEmpty) {
@@ -289,7 +289,7 @@ object StreamingJoinHelper extends PredicateHelper with 
Logging {
           logWarning(
             log"Failed to extract state value watermark from condition " +
               log"${MDC(JOIN_CONDITION, exprToCollectFrom)} due to " +
-              log"${MDC(JOIN_CONDITION_SUB_EXPRESSION, a)}")
+              log"${MDC(JOIN_CONDITION_SUB_EXPR, a)}")
           invalid = true
           Seq.empty
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
index 772382f5f1e1..ed3f0cc5a80c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.SparkIllegalArgumentException
-import org.apache.spark.internal.LogKey.{SQL_TEXT, UNSUPPORTED_EXPRESSION}
+import org.apache.spark.internal.LogKey.{SQL_TEXT, UNSUPPORTED_EXPR}
 import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, 
ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet, 
LambdaFunction, Literal, MapFilter, Not, Or}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
@@ -141,7 +141,7 @@ object ReplaceNullWithFalseInPredicate extends 
Rule[LogicalPlan] {
             "expr" -> e.sql))
       } else {
         val message = log"Expected a Boolean type expression in 
replaceNullWithFalse, " +
-          log"but got the type `${MDC(UNSUPPORTED_EXPRESSION, 
e.dataType.catalogString)}` " +
+          log"but got the type `${MDC(UNSUPPORTED_EXPR, 
e.dataType.catalogString)}` " +
           log"in `${MDC(SQL_TEXT, e.sql)}`."
         logWarning(message)
         e
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 22c09c51c237..d9f32682ab69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.annotation.Stable
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{LEFT_EXPR, RIGHT_EXPR}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
 import org.apache.spark.sql.catalyst.expressions._
@@ -310,8 +311,9 @@ class Column(val expr: Expression) extends Logging {
     val right = lit(other).expr
     if (this.expr == right) {
       logWarning(
-        s"Constructing trivially true equals predicate, '${this.expr} = 
$right'. " +
-            "Perhaps you need to use aliases.")
+        log"Constructing trivially true equals predicate, " +
+          log"'${MDC(LEFT_EXPR, this.expr)} = ${MDC(RIGHT_EXPR, right)}'. " +
+          log"Perhaps you need to use aliases.")
     }
     fn("=", other)
   }
@@ -516,8 +518,9 @@ class Column(val expr: Expression) extends Logging {
     val right = lit(other).expr
     if (this.expr == right) {
       logWarning(
-        s"Constructing trivially true equals predicate, '${this.expr} <=> 
$right'. " +
-          "Perhaps you need to use aliases.")
+        log"Constructing trivially true equals predicate, " +
+          log"'${MDC(LEFT_EXPR, this.expr)} <=> ${MDC(RIGHT_EXPR, right)}'. " +
+          log"Perhaps you need to use aliases.")
     }
     fn("<=>", other)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 15eeca87dcf6..b3a5751fce04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -29,7 +29,8 @@ import scala.util.control.NonFatal
 import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, 
SparkException, TaskContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, 
Unstable}
 import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CALL_SITE_LONG_FORM, CLASS_NAME}
 import org.apache.spark.internal.config.{ConfigEntry, 
EXECUTOR_ALLOW_SPARK_CONTEXT}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
@@ -1358,13 +1359,13 @@ object SparkSession extends Logging {
     val session = getActiveSession.orElse(getDefaultSession)
     if (session.isDefined) {
       logWarning(
-        s"""An existing Spark session exists as the active or default session.
-           |This probably means another suite leaked it. Attempting to stop it 
before continuing.
-           |This existing Spark session was created at:
-           |
-           |${session.get.creationSite.longForm}
-           |
-         """.stripMargin)
+        log"""An existing Spark session exists as the active or default 
session.
+             |This probably means another suite leaked it. Attempting to stop 
it before continuing.
+             |This existing Spark session was created at:
+             |
+             |${MDC(CALL_SITE_LONG_FORM, session.get.creationSite.longForm)}
+             |
+           """.stripMargin)
       session.get.stop()
       SparkSession.clearActiveSession()
       SparkSession.clearDefaultSession()
@@ -1391,7 +1392,8 @@ object SparkSession extends Logging {
         case e@(_: ClassCastException |
                 _: ClassNotFoundException |
                 _: NoClassDefFoundError) =>
-          logWarning(s"Cannot use $extensionConfClassName to configure session 
extensions.", e)
+          logWarning(log"Cannot use ${MDC(CLASS_NAME, extensionConfClassName)} 
to configure " +
+            log"session extensions.", e)
       }
     }
     extensions
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 62e6cc07b3e9..0464432d1f7a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -26,7 +26,8 @@ import net.razorvine.pickle.{Pickler, Unpickler}
 
 import org.apache.spark.SparkException
 import org.apache.spark.api.python.DechunkedInputStream
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_LOADER
 import org.apache.spark.security.SocketAuthServer
 import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -136,8 +137,8 @@ private[sql] object PythonSQLUtils extends Logging {
   def addJarToCurrentClassLoader(path: String): Unit = {
     Utils.getContextOrSparkClassLoader match {
       case cl: MutableURLClassLoader => cl.addURL(Utils.resolveURI(path).toURL)
-      case cl => logWarning(
-        s"Unsupported class loader $cl will not update jars in the thread 
class loader.")
+      case cl => logWarning(log"Unsupported class loader ${MDC(CLASS_LOADER, 
cl)} will not " +
+        log"update jars in the thread class loader.")
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 97b701b7380d..655d4193f684 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -27,7 +27,8 @@ import org.apache.spark.TaskContext
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.api.r.SerDe
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CONFIG
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{ExprUtils, 
GenericRowWithSchema, Literal}
@@ -58,9 +59,9 @@ private[sql] object SQLUtils extends Logging {
         
SparkSession.builder().enableHiveSupport().sparkContext(jsc.sc).getOrCreate()
       } else {
         if (enableHiveSupport) {
-          logWarning("SparkR: enableHiveSupport is requested for SparkSession 
but " +
-            s"Spark is not built with Hive or ${CATALOG_IMPLEMENTATION.key} is 
not set to " +
-            "'hive', falling back to without Hive support.")
+          logWarning(log"SparkR: enableHiveSupport is requested for 
SparkSession but " +
+            log"Spark is not built with Hive or ${MDC(CONFIG, 
CATALOG_IMPLEMENTATION.key)} " +
+            log"is not set to 'hive', falling back to without Hive support.")
         }
         SparkSession.builder().sparkContext(jsc.sc).getOrCreate()
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 36221d728066..1f141766f437 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
 import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.SparkException
+import org.apache.spark.internal.LogKey.CONFIG
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec}
@@ -524,9 +526,10 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
       if (!createHiveTableByDefault || (ctas && conf.convertCTAS)) {
         (nonHiveStorageFormat, conf.defaultDataSourceName)
       } else {
-        logWarning("A Hive serde table will be created as there is no table 
provider " +
-          s"specified. You can set 
${SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key} to false " +
-          "so that native data source table will be created instead.")
+        logWarning(log"A Hive serde table will be created as there is no table 
provider " +
+          log"specified. You can set " +
+          log"${MDC(CONFIG, SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key)} 
to false so that " +
+          log"native data source table will be created instead.")
         (defaultHiveStorage, DDLUtils.HIVE_PROVIDER)
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 252a6290cbc7..2c77b7f0a05f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{LOGICAL_PLAN_COLUMNS, 
OPTIMIZED_PLAN_COLUMNS}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -226,10 +227,11 @@ object LogicalRDD extends Logging {
       (Some(rewrittenStatistics), Some(rewrittenConstraints))
     }.getOrElse {
       // can't rewrite stats and constraints, give up
-      logWarning("The output columns are expected to the same (for name and 
type) for output " +
-        "between logical plan and optimized plan, but they aren't. output in 
logical plan: " +
-        s"${logicalPlan.output.map(_.simpleString(10))} / output in optimized 
plan: " +
-        s"${optimizedPlan.output.map(_.simpleString(10))}")
+      logWarning(log"The output columns are expected to the same (for name and 
type) for output " +
+        log"between logical plan and optimized plan, but they aren't. output 
in logical plan: " +
+        log"${MDC(LOGICAL_PLAN_COLUMNS, 
logicalPlan.output.map(_.simpleString(10)))} " +
+        log"/ output in optimized plan: " +
+        log"${MDC(OPTIMIZED_PLAN_COLUMNS, 
optimizedPlan.output.map(_.simpleString(10)))}")
 
       (None, None)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 6280e7dd100c..00666190c0cc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -26,7 +26,8 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.EXTENDED_EXPLAIN_GENERATOR
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row, 
SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
@@ -385,7 +386,8 @@ class QueryExecution(
           append(s"\n== Extended Information (${extension.title}) ==\n")
           append(extension.generateExtendedInfo(plan))
         } catch {
-          case NonFatal(e) => logWarning(s"Cannot use $extension to get 
extended information.", e)
+          case NonFatal(e) => logWarning(log"Cannot use " +
+            log"${MDC(EXTENDED_EXPLAIN_GENERATOR, extension)} to get extended 
information.", e)
         })
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d7ebf786168b..c2ba38ac4a85 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution
 import java.util.Locale
 
 import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
+import org.apache.spark.internal.LogKey.HASH_JOIN_KEYS
+import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, AnalysisException, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -212,10 +214,10 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       if (!result) {
         val keysNotSupportingHashJoin = leftKeys.concat(rightKeys).filterNot(
           e => UnsafeRowUtils.isBinaryStable(e.dataType))
-        logWarning("Hash based joins are not supported due to " +
-          "joining on keys that don't support binary equality. " +
-          "Keys not supporting hash joins: " + keysNotSupportingHashJoin
-          .map(e => e.toString + " due to DataType: " + 
e.dataType.typeName).mkString(", "))
+        logWarning(log"Hash based joins are not supported due to joining on 
keys that don't " +
+          log"support binary equality. Keys not supporting hash joins: " +
+          log"${MDC(HASH_JOIN_KEYS, keysNotSupportingHashJoin.map(
+            e => e.toString + " due to DataType: " + 
e.dataType.typeName).mkString(", "))}")
       }
       result
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 5a0bf09a1713..32d6555aa9d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -24,6 +24,8 @@ import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import org.apache.spark.{broadcast, SparkException, 
SparkUnsupportedOperationException}
+import org.apache.spark.internal.LogKey.{CODEGEN_STAGE_ID, ERROR, TREE_NODE}
+import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -406,7 +408,7 @@ trait CodegenSupport extends SparkPlan {
       if (Utils.isTesting) {
         throw SparkException.internalError(errMsg)
       } else {
-        logWarning(s"[BUG] $errMsg Please open a JIRA ticket to report it.")
+        logWarning(log"[BUG] ${MDC(ERROR, errMsg)} Please open a JIRA ticket 
to report it.")
       }
     }
     if (parent.limitNotReachedChecks.isEmpty) {
@@ -729,7 +731,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
     } catch {
       case NonFatal(_) if !Utils.isTesting && conf.codegenFallback =>
         // We should already saw the error message
-        logWarning(s"Whole-stage codegen disabled for plan 
(id=$codegenStageId):\n $treeString")
+        logWarning(log"Whole-stage codegen disabled for plan " +
+          log"(id=${MDC(CODEGEN_STAGE_ID, codegenStageId)}):\n " +
+          log"${MDC(TREE_NODE, treeString)}")
         return child.execute()
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 50f2b7c81453..ae18de16c23e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.adaptive
 
 import scala.collection.mutable
 
+import org.apache.spark.internal.LogKey.{CONFIG, SUB_QUERY}
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{DynamicPruningSubquery, 
ListQuery, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -67,8 +69,8 @@ case class InsertAdaptiveSparkPlan(
           AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, 
preprocessingRules, isSubquery)
         } catch {
           case SubqueryAdaptiveNotSupportedException(subquery) =>
-            logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled 
" +
-              s"but is not supported for sub-query: $subquery.")
+            logWarning(log"${MDC(CONFIG, 
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)} is enabled " +
+              log"but is not supported for sub-query: ${MDC(SUB_QUERY, 
subquery)}.")
             plan
         }
       } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala
index c9b22a7d1b25..a41706b1f99e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command
 
 import scala.util.control.NonFatal
 
+import org.apache.spark.internal.LogKey.{DATABASE_NAME, ERROR, TABLE_NAME}
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.{Row, SparkSession}
 
 
@@ -37,8 +39,8 @@ case class AnalyzeTablesCommand(
         CommandUtils.analyzeTable(sparkSession, tbl, noScan)
       } catch {
         case NonFatal(e) =>
-          logWarning(s"Failed to analyze table ${tbl.table} in the " +
-            s"database $db because of ${e.toString}", e)
+          logWarning(log"Failed to analyze table ${MDC(TABLE_NAME, tbl.table)} 
in the " +
+            log"database ${MDC(DATABASE_NAME, db)} because of ${MDC(ERROR, 
e.toString)}}", e)
       }
     }
     Seq.empty[Row]
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index eccf16ecea13..463f0ca23afc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -24,7 +24,8 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{DATABASE_NAME, ERROR, TABLE_NAME}
 import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, 
UnresolvedAttribute}
@@ -154,9 +155,9 @@ object CommandUtils extends Logging {
         getPathSize(fs, fs.getFileStatus(path))
       } catch {
         case NonFatal(e) =>
-          logWarning(
-            s"Failed to get the size of table ${identifier.table} in the " +
-              s"database ${identifier.database} because of ${e.toString}", e)
+          logWarning(log"Failed to get the size of table ${MDC(TABLE_NAME, 
identifier.table)} " +
+            log"in the database ${MDC(DATABASE_NAME, identifier.database)} 
because of " +
+            log"${MDC(ERROR, e.toString)}", e)
           0L
       }
     }.getOrElse(0L)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index 672417f1adbf..6c57d9b2fe26 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2, KEY, VALUE}
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.parser.ParseException
@@ -51,8 +52,9 @@ case class SetCommand(kv: Option[(String, Option[String])])
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
       val runFunc = (sparkSession: SparkSession) => {
         logWarning(
-          s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, 
" +
-            s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} 
instead.")
+          log"Property ${MDC(CONFIG, SQLConf.Deprecated.MAPRED_REDUCE_TASKS)} 
is deprecated, " +
+            log"automatically converted to ${MDC(CONFIG2, 
SQLConf.SHUFFLE_PARTITIONS.key)} " +
+            log"instead.")
         if (value.toInt < 1) {
           val msg =
             s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for 
automatically " +
@@ -68,8 +70,9 @@ case class SetCommand(kv: Option[(String, Option[String])])
     case Some((SQLConf.Replaced.MAPREDUCE_JOB_REDUCES, Some(value))) =>
       val runFunc = (sparkSession: SparkSession) => {
         logWarning(
-          s"Property ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} is Hadoop's 
property, " +
-            s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} 
instead.")
+          log"Property ${MDC(CONFIG, SQLConf.Replaced.MAPREDUCE_JOB_REDUCES)} 
is Hadoop's " +
+            log"property, automatically converted to " +
+            log"${MDC(CONFIG2, SQLConf.SHUFFLE_PARTITIONS.key)} instead.")
         if (value.toInt < 1) {
           val msg =
             s"Setting negative ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} for 
automatically " +
@@ -111,11 +114,12 @@ case class SetCommand(kv: Option[(String, 
Option[String])])
         }
         if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") &&
             key.startsWith("hive.")) {
-          logWarning(s"'SET $key=$value' might not work, since Spark doesn't 
support changing " +
-            "the Hive config dynamically. Please pass the Hive-specific config 
by adding the " +
-            s"prefix spark.hadoop (e.g. spark.hadoop.$key) when starting a 
Spark application. " +
-            "For details, see the link: 
https://spark.apache.org/docs/latest/configuration.html#"; +
-            "dynamically-loading-spark-properties.")
+          logWarning(log"'SET ${MDC(KEY, key)}=${MDC(VALUE, value)}' might not 
work, since Spark " +
+            log"doesn't support changing the Hive config dynamically. Please 
pass the " +
+            log"Hive-specific config by adding the prefix spark.hadoop " +
+            log"(e.g. spark.hadoop.${MDC(KEY, key)}) when starting a Spark 
application. For " +
+            log"details, see the link: 
https://spark.apache.org/docs/latest/configuration.html#"; +
+            log"dynamically-loading-spark-properties.")
         }
         sparkSession.conf.set(key, value)
         Seq(Row(key, value))
@@ -155,8 +159,8 @@ case class SetCommand(kv: Option[(String, Option[String])])
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
       val runFunc = (sparkSession: SparkSession) => {
         logWarning(
-          s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, 
" +
-            s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
+          log"Property ${MDC(CONFIG, SQLConf.Deprecated.MAPRED_REDUCE_TASKS)} 
is deprecated, " +
+            log"showing ${MDC(CONFIG2, SQLConf.SHUFFLE_PARTITIONS.key)} 
instead.")
         Seq(Row(
           SQLConf.SHUFFLE_PARTITIONS.key,
           sparkSession.sessionState.conf.defaultNumShufflePartitions.toString))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index a5e48784ada1..5a853a269848 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -28,7 +28,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ACTUAL_PARTITION_COLUMN, 
EXPECTED_PARTITION_COLUMN, PATH}
 import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -783,11 +784,12 @@ case class RepairTableCommand(
             partitionNames.drop(1), threshold, resolver, evalTaskSupport)
         } else {
           logWarning(
-            s"expected partition column ${partitionNames.head}, but got 
${ps(0)}, ignoring it")
+            log"expected partition column ${MDC(EXPECTED_PARTITION_COLUMN, 
partitionNames.head)}," +
+              log" but got ${MDC(ACTUAL_PARTITION_COLUMN, ps(0))}, ignoring 
it")
           Seq.empty
         }
       } else {
-        logWarning(s"ignore ${new Path(path, name)}")
+        logWarning(log"ignore ${MDC(PATH, new Path(path, name))}")
         Seq.empty
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index 8a9fbd15e2e8..a34b235d2d12 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -26,7 +26,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SparkContext, TaskContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ACTUAL_NUM_FILES, EXPECTED_NUM_FILES}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._
@@ -166,9 +167,9 @@ class BasicWriteTaskStatsTracker(
     }
 
     if (numSubmittedFiles != numFiles) {
-      logWarning(s"Expected $numSubmittedFiles files, but only saw $numFiles. 
" +
-        "This could be due to the output format not writing empty files, " +
-        "or files being not immediately visible in the filesystem.")
+      logWarning(log"Expected ${MDC(EXPECTED_NUM_FILES, numSubmittedFiles)} 
files, but only saw " +
+        log"${MDC(ACTUAL_NUM_FILES, numFiles)}. This could be due to the 
output format not " +
+        log"writing empty files, or files being not immediately visible in the 
filesystem.")
     }
     taskCommitTimeMetric.foreach(_ += taskCommitTime)
     BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 4c2d6a4cdf5e..84143a3899de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -27,7 +27,8 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CLASS_NAME, DATA_SOURCE_PROVIDER, 
DATA_SOURCES, PATHS}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogUtils}
@@ -695,8 +696,9 @@ object DataSource extends Logging {
             throw QueryCompilationErrors
               .foundMultipleXMLDataSourceError(provider1, sourceNames, 
externalSource.getName)
           } else if (internalSources.size == 1) {
-            logWarning(s"Multiple sources found for $provider1 
(${sourceNames.mkString(", ")}), " +
-              s"defaulting to the internal datasource 
(${internalSources.head.getClass.getName}).")
+            logWarning(log"Multiple sources found for 
${MDC(DATA_SOURCE_PROVIDER, provider1)} " +
+              log"(${MDC(DATA_SOURCES, sourceNames.mkString(", "))}), 
defaulting to the " +
+              log"internal datasource (${MDC(CLASS_NAME, 
internalSources.head.getClass.getName)}).")
             internalSources.head.getClass
           } else {
             throw 
QueryCompilationErrors.findMultipleDataSourceError(provider1, sourceNames)
@@ -807,7 +809,7 @@ object DataSource extends Logging {
       }
       if (filteredIn.isEmpty) {
         logWarning(
-          s"All paths were ignored:\n  ${filteredOut.mkString("\n  ")}")
+          log"All paths were ignored:\n  ${MDC(PATHS, filteredOut.mkString("\n 
 "))}")
       } else {
         logDebug(
           s"Some paths were ignored:\n  ${filteredOut.mkString("\n  ")}")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
index 2f4555effce3..2912cc965510 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
@@ -21,7 +21,8 @@ import java.util.Locale
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.api.python.PythonUtils
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.DATA_SOURCE
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import 
org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSource
 import org.apache.spark.util.Utils
@@ -53,7 +54,8 @@ class DataSourceManager extends Logging {
     }
     val previousValue = runtimeDataSourceBuilders.put(normalizedName, source)
     if (previousValue != null) {
-      logWarning(f"The data source $name replaced a previously registered data 
source.")
+      logWarning(log"The data source ${MDC(DATA_SOURCE, name)} replaced a 
previously " +
+        log"registered data source.")
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
index 836f0b069879..10c77607c311 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.math.BigDecimal.RoundingMode
 
 import org.apache.spark.Partition
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CONFIG, DESIRED_PARTITIONS_SIZE, 
MAX_PARTITIONS_SIZE, PARTITIONS_SIZE}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.ScanFileListing
@@ -98,9 +99,11 @@ object FilePartition extends Logging {
       val desiredSplitBytes =
         (totalSizeInBytes / BigDecimal(maxPartNum.get)).setScale(0, 
RoundingMode.UP).longValue
       val desiredPartitions = getFilePartitions(partitionedFiles, 
desiredSplitBytes, openCostBytes)
-      logWarning(s"The number of partitions is ${partitions.size}, which 
exceeds the maximum " +
-        s"number configured: ${maxPartNum.get}. Spark rescales it to 
${desiredPartitions.size} " +
-        s"by ignoring the configuration of 
${SQLConf.FILES_MAX_PARTITION_BYTES.key}.")
+      logWarning(log"The number of partitions is ${MDC(PARTITIONS_SIZE, 
partitions.size)}, " +
+        log"which exceeds the maximum number configured: " +
+        log"${MDC(MAX_PARTITIONS_SIZE, maxPartNum.get)}. Spark rescales it to 
" +
+        log"${MDC(DESIRED_PARTITIONS_SIZE, desiredPartitions.size)} by 
ignoring the " +
+        log"configuration of ${MDC(CONFIG, 
SQLConf.FILES_MAX_PARTITION_BYTES.key)}.")
       desiredPartitions
     } else {
       partitions
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 825b8154f681..28700fa4f37f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -24,6 +24,8 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{Partition => RDDPartition, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.LogKey.PATH
+import org.apache.spark.internal.MDC
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
@@ -259,14 +261,14 @@ class FileScanRDD(
                   }
                 } catch {
                   case e: FileNotFoundException if ignoreMissingFiles =>
-                    logWarning(s"Skipped missing file: $currentFile", e)
+                    logWarning(log"Skipped missing file: ${MDC(PATH, 
currentFile)}", e)
                     finished = true
                     null
                   // Throw FileNotFoundException even if `ignoreCorruptFiles` 
is true
                   case e: FileNotFoundException if !ignoreMissingFiles => 
throw e
                   case e @ (_: RuntimeException | _: IOException) if 
ignoreCorruptFiles =>
-                    logWarning(
-                      s"Skipped the rest of the content in the corrupted file: 
$currentFile", e)
+                    logWarning(log"Skipped the rest of the content in the 
corrupted file: " +
+                      log"${MDC(PATH, currentFile)}", e)
                     finished = true
                     null
                 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index 80002ecdaf8d..29706f9cd7f4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
 import com.google.common.cache._
 import org.apache.hadoop.fs.{FileStatus, Path}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CACHED_TABLE_PARTITION_METADATA_SIZE, 
MAX_TABLE_PARTITION_METADATA_SIZE}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.SizeEstimator
 
@@ -111,8 +112,8 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, 
cacheTTL: Long) extends
       override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int 
= {
         val estimate = (SizeEstimator.estimate(key) + 
SizeEstimator.estimate(value)) / weightScale
         if (estimate > Int.MaxValue) {
-          logWarning(s"Cached table partition metadata size is too big. 
Approximating to " +
-            s"${Int.MaxValue.toLong * weightScale}.")
+          logWarning(log"Cached table partition metadata size is too big. 
Approximating to " +
+            log"${MDC(CACHED_TABLE_PARTITION_METADATA_SIZE, 
Int.MaxValue.toLong * weightScale)}.")
           Int.MaxValue
         } else {
           estimate.toInt
@@ -126,9 +127,10 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, 
cacheTTL: Long) extends
         if (removed.getCause == RemovalCause.SIZE &&
           warnedAboutEviction.compareAndSet(false, true)) {
           logWarning(
-            "Evicting cached table partition metadata from memory due to size 
constraints " +
-              "(spark.sql.hive.filesourcePartitionFileCacheSize = "
-              + maxSizeInBytes + " bytes). This may impact query planning 
performance.")
+            log"Evicting cached table partition metadata from memory due to 
size constraints " +
+              log"(spark.sql.hive.filesourcePartitionFileCacheSize = " +
+              log"${MDC(MAX_TABLE_PARTITION_METADATA_SIZE, maxSizeInBytes)} 
bytes). " +
+              log"This may impact query planning performance.")
         }
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index cf7c536bdaec..52b5810362ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -28,7 +28,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 
 import org.apache.spark.TaskContext
 import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.rdd.{BinaryFileRDD, RDD}
 import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -202,12 +203,12 @@ object MultiLineCSVDataSource extends CSVDataSource with 
Logging {
           encoding = parsedOptions.charset)
       } catch {
         case e: FileNotFoundException if ignoreMissingFiles =>
-          logWarning(s"Skipped missing file: ${lines.getPath()}", e)
+          logWarning(log"Skipped missing file: ${MDC(PATH, lines.getPath())}", 
e)
           Array.empty[Array[String]]
         case e: FileNotFoundException if !ignoreMissingFiles => throw e
         case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles 
=>
-          logWarning(
-            s"Skipped the rest of the content in the corrupted file: 
${lines.getPath()}", e)
+          logWarning(log"Skipped the rest of the content in the corrupted 
file: " +
+            log"${MDC(PATH, lines.getPath())}", e)
           Array.empty[Array[String]]
       }
     }.take(1).headOption match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 4f19d3df40b3..ca2d59a7672f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.math.BigDecimal.RoundingMode
 
 import org.apache.spark.Partition
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{LOWER_BOUND, NEW_VALUE, OLD_VALUE, 
UPPER_BOUND}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, 
SQLContext}
 import org.apache.spark.sql.catalyst.analysis._
@@ -114,12 +115,12 @@ private[sql] object JDBCRelation extends Logging {
           (upperBound - lowerBound) < 0) {
         partitioning.numPartitions
       } else {
-        logWarning("The number of partitions is reduced because the specified 
number of " +
-          "partitions is less than the difference between upper bound and 
lower bound. " +
-          s"Updated number of partitions: ${upperBound - lowerBound}; Input 
number of " +
-          s"partitions: ${partitioning.numPartitions}; " +
-          s"Lower bound: ${boundValueToString(lowerBound)}; " +
-          s"Upper bound: ${boundValueToString(upperBound)}.")
+        logWarning(log"The number of partitions is reduced because the 
specified number of " +
+          log"partitions is less than the difference between upper bound and 
lower bound. " +
+          log"Updated number of partitions: ${MDC(NEW_VALUE, upperBound - 
lowerBound)}; " +
+          log"Input number of partitions: ${MDC(OLD_VALUE, 
partitioning.numPartitions)}; " +
+          log"Lower bound: ${MDC(LOWER_BOUND, 
boundValueToString(lowerBound))}; " +
+          log"Upper bound: ${MDC(UPPER_BOUND, 
boundValueToString(upperBound))}.")
         upperBound - lowerBound
       }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 53b0b8b5d29d..fd7be9d0ea41 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -32,7 +32,8 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, 
TaskContext}
 import org.apache.spark.executor.InputMetrics
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{DEFAULT_ISOLATION_LEVEL, 
ISOLATION_LEVEL}
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis.{DecimalPrecision, Resolver}
@@ -771,11 +772,13 @@ object JdbcUtils extends Logging with SQLConfHelper {
             // Finally update to actually requested level if possible
             finalIsolationLevel = isolationLevel
           } else {
-            logWarning(s"Requested isolation level $isolationLevel is not 
supported; " +
-                s"falling back to default isolation level $defaultIsolation")
+            logWarning(log"Requested isolation level ${MDC(ISOLATION_LEVEL, 
isolationLevel)} " +
+              log"is not supported; falling back to default isolation level " +
+              log"${MDC(DEFAULT_ISOLATION_LEVEL, defaultIsolation)}")
           }
         } else {
-          logWarning(s"Requested isolation level $isolationLevel, but 
transactions are unsupported")
+          logWarning(log"Requested isolation level ${MDC(ISOLATION_LEVEL, 
isolationLevel)}, " +
+            log"but transactions are unsupported")
         }
       } catch {
         case NonFatal(e) => logWarning("Exception while detecting transaction 
support", e)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala
index 55602ce2ed9b..34046b075395 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala
@@ -21,7 +21,8 @@ import java.nio.charset.{Charset, StandardCharsets}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.TaskAttemptContext
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ENCODING, PATH}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions, 
JSONOptionsInRead}
 import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
@@ -40,8 +41,9 @@ class JsonOutputWriter(
   }
 
   if (JSONOptionsInRead.denyList.contains(encoding)) {
-    logWarning(s"The JSON file ($path) was written in the encoding 
${encoding.displayName()}" +
-      " which can be read back by Spark only if multiLine is enabled.")
+    logWarning(log"The JSON file (${MDC(PATH, path)}) was written in the 
encoding " +
+      log"${MDC(ENCODING, encoding.displayName())} which can be read back by 
Spark only " +
+      log"if multiLine is enabled.")
   }
 
   private val writer = CodecStreams.createOutputStreamWriter(context, new 
Path(path), encoding)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 24943b37d059..4d1354d15aba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -31,7 +31,8 @@ import org.apache.orc.{BooleanColumnStatistics, 
ColumnStatistics, DateColumnStat
 
 import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
 import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
@@ -87,7 +88,7 @@ object OrcUtils extends Logging {
     } catch {
       case e: org.apache.orc.FileFormatException =>
         if (ignoreCorruptFiles) {
-          logWarning(s"Skipped the footer in the corrupted file: $file", e)
+          logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, 
file)}", e)
           None
         } else {
           throw QueryExecutionErrors.cannotReadFooterForFileError(file, e)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index df367766501d..3fdbc38bba7c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -32,7 +32,8 @@ import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GRO
 import org.apache.parquet.hadoop._
 
 import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{PATH, SCHEMA}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -409,8 +410,8 @@ object ParquetFileFormat extends Logging {
           }
           .recover { case cause: Throwable =>
             logWarning(
-              s"""Failed to parse serialized Spark schema in Parquet key-value 
metadata:
-                 |\t$serializedSchema
+              log"""Failed to parse serialized Spark schema in Parquet 
key-value metadata:
+                 |\t${MDC(SCHEMA, serializedSchema)}
                """.stripMargin,
               cause)
           }
@@ -450,7 +451,7 @@ object ParquetFileFormat extends Logging {
             conf, currentFile, SKIP_ROW_GROUPS)))
       } catch { case e: RuntimeException =>
         if (ignoreCorruptFiles) {
-          logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
+          logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, 
currentFile)}", e)
           None
         } else {
           throw 
QueryExecutionErrors.cannotReadFooterForFileError(currentFile.getPath, e)
@@ -526,8 +527,8 @@ object ParquetFileFormat extends Logging {
     }.recoverWith {
       case cause: Throwable =>
         logWarning(
-          "Failed to parse and ignored serialized Spark schema in " +
-            s"Parquet key-value metadata:\n\t$schemaString", cause)
+          log"Failed to parse and ignored serialized Spark schema in " +
+            log"Parquet key-value metadata:\n\t${MDC(SCHEMA, schemaString)}", 
cause)
         Failure(cause)
     }.toOption
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index 5020bf7333de..1c1c46542b61 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -33,7 +33,8 @@ import org.apache.parquet.schema.{PrimitiveType, Types}
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 
 import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CLASS_NAME, CONFIG}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -485,9 +486,9 @@ object ParquetUtils extends Logging {
     if (ParquetOutputFormat.getJobSummaryLevel(conf) != JobSummaryLevel.NONE
       && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
       // output summary is requested, but the class is not a Parquet Committer
-      logWarning(s"Committer $committerClass is not a ParquetOutputCommitter 
and cannot" +
-        s" create job summaries. " +
-        s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to 
NONE.")
+      logWarning(log"Committer ${MDC(CLASS_NAME, committerClass)} is not a " +
+        log"ParquetOutputCommitter and cannot create job summaries. Set 
Parquet option " +
+        log"${MDC(CONFIG, ParquetOutputFormat.JOB_SUMMARY_LEVEL)} to NONE.")
     }
 
     new OutputWriterFactory {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
index 28241fb0a67a..925246320c66 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import java.util.Locale
 
+import org.apache.spark.internal.LogKey.OPTIONS
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.LocalTempView
@@ -44,7 +46,7 @@ trait BaseCacheTableExec extends LeafV2CommandExec {
     val withoutStorageLevel = options
       .filter { case (k, _) => k.toLowerCase(Locale.ROOT) != storageLevelKey }
     if (withoutStorageLevel.nonEmpty) {
-      logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
+      logWarning(log"Invalid options: ${MDC(OPTIONS, 
withoutStorageLevel.mkString(", "))}")
     }
 
     session.sharedState.cacheManager.cacheQuery(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
index 63c8dc6517b9..df922af9ca3f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala
@@ -21,6 +21,8 @@ import java.util
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.internal.LogKey.{INDEX_NAME, TABLE_NAME}
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -55,7 +57,8 @@ case class CreateIndexExec(
         indexName, columns.map(_._1).toArray, colProperties, 
propertiesWithIndexType.asJava)
     } catch {
       case _: IndexAlreadyExistsException if ignoreIfExists =>
-        logWarning(s"Index $indexName already exists in table ${table.name}. 
Ignoring.")
+        logWarning(log"Index ${MDC(INDEX_NAME, indexName)} already exists in " 
+
+          log"table ${MDC(TABLE_NAME, table.name)}. Ignoring.")
     }
     Seq.empty
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala
index cb51b7f75f33..65fec4b243d0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.jdk.CollectionConverters.MapHasAsJava
 
+import org.apache.spark.internal.LogKey.NAMESPACE
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -47,7 +49,8 @@ case class CreateNamespaceExec(
         catalog.createNamespace(ns, (properties ++ ownership).asJava)
       } catch {
         case _: NamespaceAlreadyExistsException if ifNotExists =>
-          logWarning(s"Namespace ${namespace.quoted} was created concurrently. 
Ignoring.")
+          logWarning(log"Namespace ${MDC(NAMESPACE, namespace.quoted)} was 
created concurrently. " +
+            log"Ignoring.")
       }
     } else if (!ifNotExists) {
       throw QueryCompilationErrors.namespaceAlreadyExistsError(ns)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
index 5f3ed7a5bc76..b981fa3a2a05 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.internal.LogKey.TABLE_NAME
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -44,7 +46,8 @@ case class CreateTableExec(
         catalog.createTable(identifier, columns, partitioning.toArray, 
tableProperties.asJava)
       } catch {
         case _: TableAlreadyExistsException if ignoreIfExists =>
-          logWarning(s"Table ${identifier.quoted} was created concurrently. 
Ignoring.")
+          logWarning(
+            log"Table ${MDC(TABLE_NAME, identifier.quoted)} was created 
concurrently. Ignoring.")
       }
     } else if (!ignoreIfExists) {
       throw QueryCompilationErrors.tableAlreadyExistsError(identifier)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 828d737f93fa..a8eecb699ce9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -22,7 +22,8 @@ import scala.collection.mutable
 import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.EXPR
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, 
ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
 import org.apache.spark.sql.catalyst.catalog.CatalogUtils
@@ -650,7 +651,7 @@ private[sql] object DataSourceV2Strategy extends Logging {
       Some(new Predicate("IN", FieldReference(name) +: literals))
 
     case other =>
-      logWarning(s"Can't translate $other to source filter, unsupported 
expression")
+      logWarning(log"Can't translate ${MDC(EXPR, other)} to source filter, 
unsupported expression")
       None
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIndexExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIndexExec.scala
index 085f96119377..a77d39fb696b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIndexExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIndexExec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.spark.internal.LogKey.INDEX_NAME
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.NoSuchIndexException
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -34,7 +36,7 @@ case class DropIndexExec(
       table.dropIndex(indexName)
     } catch {
       case _: NoSuchIndexException if ignoreIfNotExists =>
-        logWarning(s"Index $indexName does not exist. Ignoring.")
+        logWarning(log"Index ${MDC(INDEX_NAME, indexName)} does not exist. 
Ignoring.")
     }
     Seq.empty
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index c7783c4e9b29..1863b4b445d4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import java.io.{FileNotFoundException, IOException}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PARTITIONED_FILE_READER
 import org.apache.spark.rdd.InputFileBlockHolder
 import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.connector.read.PartitionReader
@@ -64,8 +65,8 @@ class FilePartitionReader[T](
       currentReader != null && currentReader.next()
     } catch {
       case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
-        logWarning(
-          s"Skipped the rest of the content in the corrupted file: 
$currentReader", e)
+        logWarning(log"Skipped the rest of the content in the corrupted file: 
" +
+          log"${MDC(PARTITIONED_FILE_READER, currentReader)}", e)
         false
       case e: Throwable =>
         throw 
FileDataSourceV2.attachFilePath(currentReader.file.urlEncodedPath, e)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index 61d61ee7af25..4816e3164d4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -21,7 +21,8 @@ import java.util.{Locale, OptionalLong}
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{PATH, REASON}
 import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, 
ExpressionSet}
@@ -164,8 +165,8 @@ trait FileScan extends Scan
       val path = splitFiles(0).toPath
       if (!isSplitable(path) && splitFiles(0).length >
         sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
-        logWarning(s"Loading one large unsplittable file ${path.toString} with 
only one " +
-          s"partition, the reason is: ${getFileUnSplittableReason(path)}")
+        logWarning(log"Loading one large unsplittable file ${MDC(PATH, 
path.toString)} with only " +
+          log"one partition, the reason is: ${MDC(REASON, 
getFileUnSplittableReason(path))}")
       }
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
index cb7c3efdbe48..b6eda80e7ef8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_NAME
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -58,8 +59,9 @@ object V2ScanPartitioningAndOrdering extends 
Rule[LogicalPlan] with SQLConfHelpe
           }
         case _: UnknownPartitioning => None
         case p =>
-          logWarning(s"Spark ignores the partitioning 
${p.getClass.getSimpleName}." +
-            " Please use KeyGroupedPartitioning for better performance")
+          logWarning(
+            log"Spark ignores the partitioning ${MDC(CLASS_NAME, 
p.getClass.getSimpleName)}. " +
+              log"Please use KeyGroupedPartitioning for better performance")
           None
       }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
index 8eeb919d0baf..5b65c5fe7519 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
@@ -27,6 +27,8 @@ import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.api.python._
+import org.apache.spark.internal.LogKey.CONFIG
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.api.python.PythonSQLUtils
 import org.apache.spark.sql.catalyst.InternalRow
@@ -88,9 +90,9 @@ class ApplyInPandasWithStatePythonRunner(
   override val bufferSize: Int = {
     val configuredSize = sqlConf.pandasUDFBufferSize
     if (configuredSize < 4) {
-      logWarning("Pandas execution requires more than 4 bytes. Please 
configure bigger value " +
-        s"for the configuration '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'. " +
-        "Force using the value '4'.")
+      logWarning(log"Pandas execution requires more than 4 bytes. Please 
configure bigger value " +
+        log"for the configuration '${MDC(CONFIG, 
SQLConf.PANDAS_UDF_BUFFER_SIZE.key)}'. " +
+        log"Force using the value '4'.")
       4
     } else {
       configuredSize
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
index e353bf5a51e9..62a64bbb3289 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.python
 
+import org.apache.spark.internal.LogKey.{RDD_ID, SPARK_PLAN_ID}
+import org.apache.spark.internal.MDC
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -108,7 +110,8 @@ case class AttachDistributedSequenceExec(
   override protected[sql] def cleanupResources(): Unit = {
     try {
       if (cached != null && cached.getStorageLevel != StorageLevel.NONE) {
-        logWarning(s"clean up cached RDD(${cached.id}) in 
AttachDistributedSequenceExec($id)")
+        logWarning(log"clean up cached RDD(${MDC(RDD_ID, cached.id)}) in " +
+          log"AttachDistributedSequenceExec(${MDC(SPARK_PLAN_ID, id)})")
         cached.unpersist(blocking = false)
       }
     } finally {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
index 18dd2eba083a..37f75742214c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{DELEGATE, READ_LIMIT}
 import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, 
ReadLimit, SparkDataStream, SupportsAdmissionControl, 
SupportsTriggerAvailableNow}
 import org.apache.spark.sql.connector.read.streaming
 
@@ -29,10 +30,10 @@ class AvailableNowDataStreamWrapper(val delegate: 
SparkDataStream)
   extends SparkDataStream with SupportsTriggerAvailableNow with Logging {
 
   // See SPARK-45178 for more details.
-  logWarning("Activating the wrapper implementation of Trigger.AvailableNow 
for source " +
-    s"[$delegate]. Note that this might introduce possibility of 
deduplication, dataloss, " +
-    "correctness issue. Enable the config with extreme care. We strongly 
recommend to contact " +
-    "the data source developer to support Trigger.AvailableNow.")
+  logWarning(log"Activating the wrapper implementation of Trigger.AvailableNow 
for source " +
+    log"[${MDC(DELEGATE, delegate)}]. Note that this might introduce 
possibility of " +
+    log"deduplication, dataloss, correctness issue. Enable the config with 
extreme care. We " +
+    log"strongly recommend to contact the data source developer to support 
Trigger.AvailableNow.")
 
   private var fetchedOffset: streaming.Offset = _
 
@@ -71,8 +72,8 @@ class AvailableNowDataStreamWrapper(val delegate: 
SparkDataStream)
     case s: SupportsAdmissionControl =>
       val limit = s.getDefaultReadLimit
       if (limit != ReadLimit.allAvailable()) {
-        logWarning(s"The read limit $limit is ignored because source $delegate 
does not " +
-          "support running Trigger.AvailableNow queries.")
+        logWarning(log"The read limit ${MDC(READ_LIMIT, limit)} is ignored 
because source " +
+          log"${MDC(DELEGATE, delegate)} does not support running 
Trigger.AvailableNow queries.")
       }
       ReadLimit.allAvailable()
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 34c5dee0997b..1c530fcabe0a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -26,7 +26,8 @@ import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
 import org.apache.hadoop.fs.permission.FsPermission
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{PATH, TEMP_PATH}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
 import org.apache.spark.sql.internal.SQLConf
@@ -154,8 +155,8 @@ object CheckpointFileManager extends Logging {
           fm.renameTempFile(tempPath, finalPath, overwriteIfPossible)
         } catch {
           case fe: FileAlreadyExistsException =>
-            logWarning(
-              s"Failed to rename temp file $tempPath to $finalPath because 
file exists", fe)
+            logWarning(log"Failed to rename temp file ${MDC(TEMP_PATH, 
tempPath)} to " +
+              log"${MDC(PATH, finalPath)} because file exists", fe)
             if (!overwriteIfPossible) throw fe
         }
 
@@ -178,13 +179,13 @@ object CheckpointFileManager extends Logging {
           underlyingStream.close()
         } catch {
           case NonFatal(e) =>
-            logWarning(s"Error cancelling write to $finalPath, " +
-              s"continuing to delete temp path $tempPath", e)
+            logWarning(log"Error cancelling write to ${MDC(PATH, finalPath)}, 
continuing to " +
+              log"delete temp path ${MDC(TEMP_PATH, tempPath)}", e)
         }
         fm.delete(tempPath)
       } catch {
         case NonFatal(e) =>
-          logWarning(s"Error deleting temp file $tempPath", e)
+          logWarning(log"Error deleting temp file ${MDC(TEMP_PATH, 
tempPath)}", e)
       } finally {
         terminated = true
       }
@@ -210,10 +211,10 @@ object CheckpointFileManager extends Logging {
     } catch {
       case e: UnsupportedFileSystemException =>
         logWarning(
-          "Could not use FileContext API for managing Structured Streaming 
checkpoint files at " +
-            s"$path. Using FileSystem API instead for managing log files. If 
the implementation " +
-            s"of FileSystem.rename() is not atomic, then the correctness and 
fault-tolerance of" +
-            s"your Structured Streaming is not guaranteed.")
+          log"Could not use FileContext API for managing Structured Streaming 
checkpoint files " +
+            log"at ${MDC(PATH, path)}. Using FileSystem API instead for 
managing log files. If " +
+            log"the implementation of FileSystem.rename() is not atomic, then 
the correctness " +
+            log"and fault-tolerance of your Structured Streaming is not 
guaranteed.")
         new FileSystemBasedCheckpointFileManager(path, hadoopConf)
     }
   }
@@ -274,7 +275,8 @@ class FileSystemBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuration
         throw QueryExecutionErrors.renameSrcPathNotFoundError(srcPath)
       } else {
         val e = QueryExecutionErrors.failedRenameTempFileError(srcPath, 
dstPath)
-        logWarning(e.getMessage)
+        logWarning(log"Failed to rename temp file ${MDC(TEMP_PATH, srcPath)} 
to " +
+          log"${MDC(PATH, dstPath)} as FileSystem.rename returned false.")
         throw e
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 8d38bba1f2a6..884de070bf32 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.Path
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.internal.LogKey.{BATCH_ID, ELAPSED_TIME}
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.util.Utils
@@ -240,7 +242,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
     }
 
     if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) {
-      logWarning(s"Compacting took $elapsedMs ms for compact batch $batchId")
+      logWarning(log"Compacting took ${MDC(ELAPSED_TIME, elapsedMs)} ms for 
compact batch " +
+        log"${MDC(BATCH_ID, batchId)}")
     } else {
       logDebug(s"Compacting took $elapsedMs ms for compact batch $batchId")
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index ea8db3c99de9..18895e525156 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -23,7 +23,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, PATH}
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions._
@@ -60,8 +61,8 @@ object FileStreamSink extends Logging {
         } catch {
           case e: SparkException => throw e
           case NonFatal(e) =>
-            logWarning(s"Assume no metadata directory. Error while looking for 
" +
-              s"metadata directory in the path: $singlePath.", e)
+            logWarning(log"Assume no metadata directory. Error while looking 
for " +
+              log"metadata directory in the path: ${MDC(PATH, singlePath)}.", 
e)
             false
         }
       case _ => false
@@ -84,7 +85,7 @@ object FileStreamSink extends Logging {
         } catch {
           case NonFatal(e) =>
             // We may not have access to this directory. Don't fail the query 
if that happens.
-            logWarning(e.getMessage, e)
+            logWarning(log"${MDC(ERROR, e.getMessage)}", e)
             false
         }
       if (legacyMetadataPathExists) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index eacbd0447d16..6aa8e94db94f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
 
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CURRENT_PATH, ELAPSED_TIME, NEW_PATH, 
NUM_FILES}
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -380,7 +381,8 @@ class FileStreamSource(
     val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime)
     if (listingTimeMs > 2000) {
       // Output a warning when listing files uses more than 2 seconds.
-      logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms")
+      logWarning(log"Listed ${MDC(NUM_FILES, files.size)} file(s) in " +
+        log"${MDC(ELAPSED_TIME, listingTimeMs)} ms")
     } else {
       logTrace(s"Listed ${files.size} file(s) in $listingTimeMs ms")
     }
@@ -628,11 +630,13 @@ object FileStreamSource {
 
         logDebug(s"Archiving completed file $curPath to $newPath")
         if (!fileSystem.rename(curPath, newPath)) {
-          logWarning(s"Fail to move $curPath to $newPath / skip moving file.")
+          logWarning(log"Fail to move ${MDC(CURRENT_PATH, curPath)} to " +
+            log"${MDC(NEW_PATH, newPath)} / skip moving file.")
         }
       } catch {
         case NonFatal(e) =>
-          logWarning(s"Fail to move $curPath to $newPath / skip moving file.", 
e)
+          logWarning(log"Fail to move ${MDC(CURRENT_PATH, curPath)} to " +
+            log"${MDC(NEW_PATH, newPath)} / skip moving file.", e)
       }
     }
   }
@@ -646,12 +650,12 @@ object FileStreamSource {
         logDebug(s"Removing completed file $curPath")
 
         if (!fileSystem.delete(curPath, false)) {
-          logWarning(s"Failed to remove $curPath / skip removing file.")
+          logWarning(log"Failed to remove ${MDC(CURRENT_PATH, curPath)} / skip 
removing file.")
         }
       } catch {
         case NonFatal(e) =>
           // Log to error but swallow exception to avoid process being stopped
-          logWarning(s"Fail to remove $curPath / skip removing file.", e)
+          logWarning(log"Fail to remove ${MDC(CURRENT_PATH, curPath)} / skip 
removing file.", e)
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index cfccfff3a138..62999d711ce8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.ERROR
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, 
ExpressionWithRandomSeed}
@@ -419,9 +420,9 @@ class IncrementalExecution(
           } catch {
             case e: Exception =>
               // no need to throw fatal error, returns empty map
-              logWarning("Error reading metadata path for stateful operator. " 
+
-                s"This may due to no prior committed batch, or previously run 
on lower versions:" +
-                s" ${e.getMessage}")
+              logWarning(log"Error reading metadata path for stateful 
operator. This may due to " +
+                log"no prior committed batch, or previously run on lower 
versions: " +
+                log"${MDC(ERROR, e.getMessage)}")
           }
         }
         ret
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 46ce33687890..f444c46f1419 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -25,7 +25,8 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -95,7 +96,8 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
           }
         } catch {
           case e: IOException =>
-            logWarning(s"Fail to remove temporary file $path, continue 
removing next.", e)
+            logWarning(log"Fail to remove temporary file ${MDC(PATH, path)}, " 
+
+              log"continue removing next.", e)
         }
       }
       pendingCommitFiles.clear()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index ae5a033538ab..e5d275d7c242 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming
 import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable
 
+import org.apache.spark.internal.LogKey.{LATEST_BATCH_ID, 
LATEST_COMMITTED_BATCH_ID, READ_LIMIT, SPARK_DATA_STREAM}
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, 
FileSourceMetadataAttribute, LocalTimestamp}
@@ -108,10 +110,10 @@ class MicroBatchExecution(
           val supportsTriggerAvailableNow = sources.distinct.forall { src =>
             val supports = src.isInstanceOf[SupportsTriggerAvailableNow]
             if (!supports) {
-              logWarning(s"source [$src] does not support 
Trigger.AvailableNow. Falling back to " +
-                "single batch execution. Note that this may not guarantee 
processing new data if " +
-                "there is an uncommitted batch. Please consult with data 
source developer to " +
-                "support Trigger.AvailableNow.")
+              logWarning(log"source [${MDC(SPARK_DATA_STREAM, src)}] does not 
support " +
+                log"Trigger.AvailableNow. Falling back to single batch 
execution. Note that this " +
+                log"may not guarantee processing new data if there is an 
uncommitted batch. " +
+                log"Please consult with data source developer to support 
Trigger.AvailableNow.")
             }
 
             supports
@@ -211,8 +213,8 @@ class MicroBatchExecution(
           case s: SupportsAdmissionControl =>
             val limit = s.getDefaultReadLimit
             if (limit != ReadLimit.allAvailable()) {
-              logWarning(
-                s"The read limit $limit for $s is ignored when Trigger.Once is 
used.")
+              logWarning(log"The read limit ${MDC(READ_LIMIT, limit)} for " +
+                log"${MDC(SPARK_DATA_STREAM, s)} is ignored when Trigger.Once 
is used.")
             }
             s -> ReadLimit.allAvailable()
           case s =>
@@ -510,9 +512,9 @@ class MicroBatchExecution(
                   // here, so we do nothing here.
               }
             } else if (latestCommittedBatchId < latestBatchId - 1) {
-              logWarning(s"Batch completion log latest batch id is " +
-                s"${latestCommittedBatchId}, which is not trailing " +
-                s"batchid $latestBatchId by one")
+              logWarning(log"Batch completion log latest batch id is " +
+                log"${MDC(LATEST_COMMITTED_BATCH_ID, latestCommittedBatchId)}, 
which is not " +
+                log"trailing batchid ${MDC(LATEST_BATCH_ID, latestBatchId)} by 
one")
             }
           case None => logInfo("no commit log present")
         }
@@ -748,8 +750,8 @@ class MicroBatchExecution(
                 }
               } else if (catalogTable.exists(_ ne 
newRelation.catalogTable.get)) {
                 // Output a warning if `catalogTable` is provided by the 
source rather than engine
-                logWarning(
-                  s"Source $source should not produce the information of 
catalog table by its own.")
+                logWarning(log"Source ${MDC(SPARK_DATA_STREAM, source)} should 
not produce the " +
+                  log"information of catalog table by its own.")
               }
               newRelation
           }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 006d6221e55a..9193b7bd57f5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.streaming
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CONFIG, DEFAULT_VALUE, NEW_VALUE, 
OLD_VALUE, TIP}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
SparkDataStream}
@@ -143,8 +144,9 @@ object OffsetSeqMetadata extends Logging {
           // Config value exists in the metadata, update the session config 
with this value
           val optionalValueInSession = sessionConf.getOption(confKey)
           if (optionalValueInSession.isDefined && optionalValueInSession.get 
!= valueInMetadata) {
-            logWarning(s"Updating the value of conf '$confKey' in current 
session from " +
-              s"'${optionalValueInSession.get}' to '$valueInMetadata'.")
+            logWarning(log"Updating the value of conf '${MDC(CONFIG, 
confKey)}' in current " +
+              log"session from '${MDC(OLD_VALUE, optionalValueInSession.get)}' 
" +
+              log"to '${MDC(NEW_VALUE, valueInMetadata)}'.")
           }
           sessionConf.set(confKey, valueInMetadata)
 
@@ -156,14 +158,15 @@ object OffsetSeqMetadata extends Logging {
 
             case Some(defaultValue) =>
               sessionConf.set(confKey, defaultValue)
-              logWarning(s"Conf '$confKey' was not found in the offset log, " +
-                s"using default value '$defaultValue'")
+              logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in 
the offset log, " +
+                log"using default value '${MDC(DEFAULT_VALUE, defaultValue)}'")
 
             case None =>
               val valueStr = sessionConf.getOption(confKey).map { v =>
                 s" Using existing session conf value '$v'."
               }.getOrElse { " No value set in session conf." }
-              logWarning(s"Conf '$confKey' was not found in the offset log. 
$valueStr")
+              logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in 
the offset log. " +
+                log"${MDC(TIP, valueStr)}")
 
           }
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 0d32eed9b6bd..138e4abefce2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -25,7 +25,8 @@ import java.util.{Optional, UUID}
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{EXECUTION_PLAN_LEAVES, 
FINISH_TRIGGER_DURATION, LOGICAL_PLAN_LEAVES, PROCESSING_TIME}
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.optimizer.InlineCTE
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, 
LogicalPlan, WithCTE}
@@ -382,9 +383,10 @@ abstract class ProgressContext(
     val finishTriggerDurationMillis = triggerClock.getTimeMillis() - 
triggerEndTimestamp
     val thresholdForLoggingMillis = 60 * 1000
     if (finishTriggerDurationMillis > math.max(thresholdForLoggingMillis, 
processingTimeMills)) {
-      logWarning("Query progress update takes longer than batch processing 
time. Progress " +
-        s"update takes $finishTriggerDurationMillis milliseconds. Batch 
processing takes " +
-        s"$processingTimeMills milliseconds")
+      logWarning(log"Query progress update takes longer than batch processing 
time. Progress " +
+        log"update takes ${MDC(FINISH_TRIGGER_DURATION, 
finishTriggerDurationMillis)} " +
+        log"milliseconds. Batch processing takes ${MDC(PROCESSING_TIME, 
processingTimeMills)} " +
+        log"milliseconds")
     }
   }
 
@@ -485,11 +487,10 @@ abstract class ProgressContext(
         if (!metricWarningLogged) {
           def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), 
${seq.mkString(", ")}"
 
-          logWarning(
-            "Could not report metrics as number leaves in trigger logical plan 
did not match that" +
-              s" of the execution plan:\n" +
-              s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
-              s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
+          logWarning(log"Could not report metrics as number leaves in trigger 
logical plan did " +
+            log"not match that of the execution plan:\nlogical plan leaves: " +
+            log"${MDC(LOGICAL_PLAN_LEAVES, 
toString(allLogicalPlanLeaves))}\nexecution plan " +
+            log"leaves: ${MDC(EXECUTION_PLAN_LEAVES, 
toString(allExecPlanLeaves))}\n")
           metricWarningLogged = true
         }
         Map.empty
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
index 35bb7db6a6e1..e80860b82c32 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
@@ -23,6 +23,8 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.internal.LogKey.{CONFIG, PATH}
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -42,8 +44,8 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with 
SQLConfHelper {
       val (resolvedCheckpointLocation, deleteCheckpointOnStop) = 
resolveCheckpointLocation(s)
 
       if (conf.adaptiveExecutionEnabled) {
-        logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
-          "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
+        logWarning(log"${MDC(CONFIG, SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)} 
" +
+          log"is not supported in streaming DataFrames/Datasets and will be 
disabled.")
       }
 
       if (conf.isUnsupportedOperationCheckEnabled) {
@@ -77,10 +79,11 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with 
SQLConfHelper {
       if (s.useTempCheckpointLocation) {
         deleteCheckpointOnStop = true
         val tempDir = Utils.createTempDir(namePrefix = 
"temporary").getCanonicalPath
-        logWarning("Temporary checkpoint location created which is deleted 
normally when" +
-          s" the query didn't fail: $tempDir. If it's required to delete it 
under any" +
-          s" circumstances, please set 
${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
-          s" true. Important to know deleting temp checkpoint folder is best 
effort.")
+        logWarning(log"Temporary checkpoint location created which is deleted 
normally when" +
+          log" the query didn't fail: ${MDC(PATH, tempDir)}. If it's required 
to delete " +
+          log"it under any circumstances, please set " +
+          log"${MDC(CONFIG, 
SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key)} to" +
+          log" true. Important to know deleting temp checkpoint folder is best 
effort.")
         // SPARK-42676 - Write temp checkpoints for streaming queries to local 
filesystem
         // even if default FS is set differently.
         // This is a band-aid fix. Ideally we should convert `tempDir` to 
URIs, but there
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 50a73082a8c4..ce2bbba550bc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -32,7 +32,8 @@ import 
com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, 
SparkThrowable}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{PATH, SPARK_DATA_STREAM}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
@@ -411,7 +412,7 @@ abstract class StreamExecution(
             case NonFatal(e) =>
               // Deleting temp checkpoint folder is best effort, don't throw 
non fatal exceptions
               // when we cannot delete them.
-              logWarning(s"Cannot delete $checkpointPath", e)
+              logWarning(log"Cannot delete ${MDC(PATH, checkpointPath)}", e)
           }
         }
       } finally {
@@ -446,7 +447,8 @@ abstract class StreamExecution(
         source.stop()
       } catch {
         case NonFatal(e) =>
-          logWarning(s"Failed to stop streaming source: $source. Resources may 
have leaked.", e)
+          logWarning(log"Failed to stop streaming source: 
${MDC(SPARK_DATA_STREAM, source)}. " +
+            log"Resources may have leaked.", e)
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
index e83c83df5322..dc7224d37407 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{EXPIRY_TIMESTAMP, KEY}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
@@ -130,8 +131,8 @@ class TimerStateImpl(
   def registerTimer(expiryTimestampMs: Long): Unit = {
     val groupingKey = getGroupingKey(keyToTsCFName)
     if (exists(groupingKey, expiryTimestampMs)) {
-      logWarning(s"Failed to register timer for key=$groupingKey and " +
-        s"timestamp=$expiryTimestampMs since it already exists")
+      logWarning(log"Failed to register timer for key=${MDC(KEY, groupingKey)} 
and " +
+        log"timestamp=${MDC(EXPIRY_TIMESTAMP, expiryTimestampMs)} ms since it 
already exists")
     } else {
       store.put(encodeKey(groupingKey, expiryTimestampMs), EMPTY_ROW, 
keyToTsCFName)
       store.put(encodeSecIndexKey(groupingKey, expiryTimestampMs), EMPTY_ROW, 
tsToKeyCFName)
@@ -147,8 +148,8 @@ class TimerStateImpl(
     val groupingKey = getGroupingKey(keyToTsCFName)
 
     if (!exists(groupingKey, expiryTimestampMs)) {
-      logWarning(s"Failed to delete timer for key=$groupingKey and " +
-        s"timestamp=$expiryTimestampMs since it does not exist")
+      logWarning(log"Failed to delete timer for key=${MDC(KEY, groupingKey)} 
and " +
+        log"timestamp=${MDC(EXPIRY_TIMESTAMP, expiryTimestampMs)} ms since it 
does not exist")
     } else {
       store.remove(encodeKey(groupingKey, expiryTimestampMs), keyToTsCFName)
       store.remove(encodeSecIndexKey(groupingKey, expiryTimestampMs), 
tsToKeyCFName)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index 143230759724..27e7fc015385 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ELAPSED_TIME, TRIGGER_INTERVAL}
 import org.apache.spark.util.{Clock, SystemClock}
 
 trait TriggerExecutor {
@@ -98,8 +99,9 @@ case class ProcessingTimeExecutor(
 
   /** Called when a batch falls behind */
   def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = {
-    logWarning("Current batch is falling behind. The trigger interval is " +
-      s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} 
milliseconds")
+    logWarning(log"Current batch is falling behind. The trigger interval is " +
+      log"${MDC(TRIGGER_INTERVAL, intervalMs)}} milliseconds, but spent " +
+      log"${MDC(ELAPSED_TIME, realElapsedTimeMs)} milliseconds")
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
index b41b3c329712..45a2428d2784 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
@@ -29,7 +29,8 @@ import org.json4s.{DefaultFormats, Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{HOST, PORT}
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -179,7 +180,7 @@ class TextSocketContinuousStream(
             val line = reader.readLine()
             if (line == null) {
               // End of file reached
-              logWarning(s"Stream closed by $host:$port")
+              logWarning(log"Stream closed by ${MDC(HOST, host)}:${MDC(PORT, 
port)}")
               return
             }
             TextSocketContinuousStream.this.synchronized {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
index a01f40bead89..ada39ca4e776 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
@@ -25,7 +25,8 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{HOST, PORT}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
@@ -79,7 +80,7 @@ class TextSocketMicroBatchStream(host: String, port: Int, 
numPartitions: Int)
             val line = reader.readLine()
             if (line == null) {
               // End of file reached
-              logWarning(s"Stream closed by $host:$port")
+              logWarning(log"Stream closed by ${MDC(HOST, host)}:${MDC(PORT, 
port)}")
               return
             }
             TextSocketMicroBatchStream.this.synchronized {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 2ecfa0931042..eaa4e92af929 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -32,7 +32,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
 import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKey.{FILE_VERSION, OP_ID, PARTITION_ID, 
PATH}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -341,7 +342,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       cleanup()
     } catch {
       case NonFatal(e) =>
-        logWarning(s"Error performing snapshot and cleaning up $this")
+        logWarning(log"Error performing snapshot and cleaning up " + 
toMessageWithContext)
     }
   }
 
@@ -354,9 +355,13 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       Nil
   }
 
+  private def toMessageWithContext: MessageWithContext = {
+    log"HDFSStateStoreProvider[id = (op=${MDC(OP_ID, 
stateStoreId.operatorId)}," +
+      log"part=${MDC(PARTITION_ID, stateStoreId.partitionId)}),dir = 
${MDC(PATH, baseDir)}]"
+  }
+
   override def toString(): String = {
-    s"HDFSStateStoreProvider[" +
-      s"id = 
(op=${stateStoreId.operatorId},part=${stateStoreId.partitionId}),dir = 
$baseDir]"
+    toMessageWithContext.message
   }
 
   /* Internal fields and methods */
@@ -463,9 +468,9 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       return loadedCurrentVersionMap.get
     }
 
-    logWarning(s"The state for version $version doesn't exist in loadedMaps. " 
+
-      "Reading snapshot file and delta files if needed..." +
-      "Note that this is normal for the first batch of starting query.")
+    logWarning(log"The state for version ${MDC(FILE_VERSION, version)} doesn't 
exist in " +
+      log"loadedMaps. Reading snapshot file and delta files if needed..." +
+      log"Note that this is normal for the first batch of starting query.")
 
     loadedMapCacheMissCount.increment()
 
@@ -720,7 +725,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       }
     } catch {
       case NonFatal(e) =>
-        logWarning(s"Error doing snapshots for $this", e)
+        logWarning(log"Error doing snapshots for " + toMessageWithContext, e)
     }
   }
 
@@ -751,7 +756,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
       }
     } catch {
       case NonFatal(e) =>
-        logWarning(s"Error cleaning up files for $this", e)
+        logWarning(log"Error cleaning up files for " + toMessageWithContext, e)
     }
   }
 
@@ -805,7 +810,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
           case "snapshot" =>
             versionToFiles.put(version, StoreFile(version, path, isSnapshot = 
true))
           case _ =>
-            logWarning(s"Could not identify file $path for $this")
+            logWarning(log"Could not identify file ${MDC(PATH, path)} for " + 
toMessageWithContext)
         }
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 61c3d349655f..b1005e589073 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -36,7 +36,8 @@ import org.rocksdb.CompressionType._
 import org.rocksdb.TickerType._
 
 import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, PATH}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.util.{NextIterator, Utils}
@@ -945,7 +946,8 @@ class RocksDB(
       Utils.deleteRecursively(file)
     } catch {
       case e: Exception =>
-        logWarning(s"Error recursively deleting local dir $file while $msg", e)
+        logWarning(
+          log"Error recursively deleting local dir ${MDC(PATH, file)} while 
${MDC(ERROR, msg)}", e)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index bd1daa48f809..e24a156357f9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -38,7 +38,8 @@ import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{FILE_VERSION, MAX_FILE_VERSION, PATH}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
@@ -343,7 +344,8 @@ class RocksDBFileManager(
         logInfo(s"Deleted changelog file $version")
       } catch {
         case e: Exception =>
-          logWarning(s"Error deleting changelog file for version $version", e)
+          logWarning(
+            log"Error deleting changelog file for version ${MDC(FILE_VERSION, 
version)}", e)
       }
     }
   }
@@ -446,9 +448,10 @@ class RocksDBFileManager(
         case e: Exception =>
           failedToDelete += 1
           if (maxUsedVersion == -1) {
-            logWarning(s"Error deleting orphan file $dfsFileName", e)
+            logWarning(log"Error deleting orphan file ${MDC(PATH, 
dfsFileName)}", e)
           } else {
-            logWarning(s"Error deleting file $dfsFileName, last used in 
version $maxUsedVersion", e)
+            logWarning(log"Error deleting file ${MDC(PATH, dfsFileName)}, " +
+              log"last used in version ${MDC(MAX_FILE_VERSION, 
maxUsedVersion)}", e)
           }
       }
     }
@@ -462,7 +465,8 @@ class RocksDBFileManager(
         logDebug(s"Deleted version $version")
       } catch {
         case e: Exception =>
-          logWarning(s"Error deleting version file $versionFile for version 
$version", e)
+          logWarning(log"Error deleting version file ${MDC(PATH, versionFile)} 
for " +
+            log"version ${MDC(FILE_VERSION, version)}", e)
       }
     }
     logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to 
delete" +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 85f7fed90c6c..699ce75a88de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -30,7 +30,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{SparkContext, SparkEnv, 
SparkUnsupportedOperationException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{LOAD_TIME, QUERY_RUN_ID, 
STATE_STORE_PROVIDER, STORE_ID}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -702,9 +703,9 @@ object StateStore extends Logging {
       }
 
       if (loadTimeMs > 2000L) {
-        logWarning(s"Loaded state store provider in loadTimeMs=$loadTimeMs " +
-          s"for storeId=${storeProviderId.storeId.toString} and " +
-          s"queryRunId=${storeProviderId.queryRunId}")
+        logWarning(log"Loaded state store provider in 
loadTimeMs=${MDC(LOAD_TIME, loadTimeMs)} " +
+          log"for storeId=${MDC(STORE_ID, storeProviderId.storeId.toString)} 
and " +
+          log"queryRunId=${MDC(QUERY_RUN_ID, storeProviderId.queryRunId)}")
       }
 
       val otherProviderIds = loadedProviders.keys.filter(_ != 
storeProviderId).toSeq
@@ -824,7 +825,8 @@ object StateStore extends Logging {
             }
           } catch {
             case NonFatal(e) =>
-              logWarning(s"Error managing $provider, stopping management 
thread", e)
+              logWarning(log"Error managing ${MDC(STATE_STORE_PROVIDER, 
provider)}, " +
+                log"stopping management thread", e)
               threadPoolException.set(e)
           } finally {
             val duration = System.currentTimeMillis() - startTime
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 9802a4dce4e5..1303668b8f86 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -24,7 +24,8 @@ import scala.annotation.tailrec
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{END_INDEX, START_INDEX}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, JoinedRow, Literal, SafeProjection, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
@@ -366,9 +367,9 @@ class SymmetricHashJoinStateManager(
 
             // If nulls were found at the end, log a warning for the range of 
null indices.
             if (nonNullIndex != numValues - 1) {
-              logWarning(s"`keyWithIndexToValue` returns a null value for 
indices " +
-                s"with range from startIndex=${nonNullIndex + 1} " +
-                s"and endIndex=${numValues - 1}.")
+              logWarning(log"`keyWithIndexToValue` returns a null value for 
indices " +
+                log"with range from startIndex=${MDC(START_INDEX, nonNullIndex 
+ 1)} " +
+                log"and endIndex=${MDC(END_INDEX, numValues - 1)}.")
             }
 
             // Remove all null values from nonNullIndex + 1 onwards
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 8a2a5282b69d..5e2aae45c680 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
 import org.apache.spark.{JobExecutionStatus, SparkConf}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_NAME
 import org.apache.spark.internal.config.Status._
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.connector.metric.CustomMetric
@@ -222,9 +223,9 @@ class SQLAppStatusListener(
             method
           } catch {
             case NonFatal(e) =>
-              logWarning(s"Unable to load custom metric object for class 
`$className`. " +
-                "Please make sure that the custom metric class is in the 
classpath and " +
-                "it has 0-arg constructor.", e)
+              logWarning(log"Unable to load custom metric object for class " +
+                log"`${MDC(CLASS_NAME, className)}`. Please make sure that the 
custom metric " +
+                log"class is in the classpath and it has 0-arg constructor.", 
e)
               // Cannot initialize custom metric object, we might be in 
history server that does
               // not have the custom metric class.
               val defaultMethod = (_: Array[Long], _: Array[Long]) => "N/A"
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 164710cdd883..af1304dccee0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -29,7 +29,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path}
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.CacheManager
@@ -258,8 +259,9 @@ object SharedState extends Logging {
     val sparkWarehouseOption =
       
initialConfigs.get(WAREHOUSE_PATH.key).orElse(sparkConf.getOption(WAREHOUSE_PATH.key))
     if (initialConfigs.contains(HIVE_WAREHOUSE_CONF_NAME)) {
-      logWarning(s"Not allowing to set $HIVE_WAREHOUSE_CONF_NAME in 
SparkSession's " +
-        s"options, please use ${WAREHOUSE_PATH.key} to set statically for 
cross-session usages")
+      logWarning(log"Not allowing to set ${MDC(CONFIG, 
HIVE_WAREHOUSE_CONF_NAME)} in " +
+        log"SparkSession's options, please use ${MDC(CONFIG2, 
WAREHOUSE_PATH.key)} to " +
+        log"set statically for cross-session usages")
     }
     // hive.metastore.warehouse.dir only stay in hadoopConf
     sparkConf.remove(HIVE_WAREHOUSE_CONF_NAME)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index c2c430a7b39d..e4cd79e3f53c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -24,6 +24,8 @@ import java.util.Locale
 
 import scala.util.Using
 
+import org.apache.spark.internal.LogKey.COLUMN_NAME
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, 
NonEmptyNamespaceException, NoSuchIndexException}
@@ -368,7 +370,8 @@ private case class PostgresDialect() extends JdbcDialect 
with SQLConfHelper {
           }
         } catch {
           case e: SQLException =>
-            logWarning(s"Failed to get array dimension for column 
$columnName", e)
+            logWarning(
+              log"Failed to get array dimension for column ${MDC(COLUMN_NAME, 
columnName)}", e)
         }
       case _ =>
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 225f9d1f19a5..424c4f8a7d26 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -25,7 +25,8 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.annotation.Evolving
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{QUERY_ID, RUN_ID}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.streaming.{WriteToStream, 
WriteToStreamStatement}
@@ -367,8 +368,8 @@ class StreamingQueryManager private[sql] (
       if (activeOption.isDefined) {
         if (shouldStopActiveRun) {
           val oldQuery = activeOption.get
-          logWarning(s"Stopping existing streaming query [id=${query.id}, " +
-            s"runId=${oldQuery.runId}], as a new run is being started.")
+          logWarning(log"Stopping existing streaming query [id=${MDC(QUERY_ID, 
query.id)}, " +
+            log"runId=${MDC(RUN_ID, oldQuery.runId)}], as a new run is being 
started.")
           Some(oldQuery)
         } else {
           throw new IllegalStateException(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to