This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 149ac0f8893b [SPARK-47581][CORE] SQL catalyst: Migrate logWarning with 
variables to structured logging framework
149ac0f8893b is described below

commit 149ac0f8893b5be8b8b0556ef47a2384aaad1850
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Mon Apr 8 22:56:10 2024 -0700

    [SPARK-47581][CORE] SQL catalyst: Migrate logWarning with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    
    Migrate logWarning with variables of the Catalyst module to structured 
logging framework. This transforms the logWarning entries of the following API
    ```
    def logWarning(msg: => String): Unit
    ```
    to
    ```
    def logWarning(entry: LogEntry): Unit
    ```
    
    ### Why are the changes needed?
    
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Spark core logs will contain additional MDC
    
    ### How was this patch tested?
    
    Compiler and scala style checks, as well as code review.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45904 from dtenedor/log-warn-catalyst.
    
    Lead-authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Co-authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../scala/org/apache/spark/internal/LogKey.scala   | 26 ++++++++++++++++++++++
 .../sql/catalyst/analysis/FunctionRegistry.scala   |  6 +++--
 .../sql/catalyst/analysis/HintErrorLogger.scala    | 19 ++++++++++------
 .../catalyst/analysis/StreamingJoinHelper.scala    | 19 ++++++++++------
 .../analysis/UnsupportedOperationChecker.scala     |  6 +++--
 .../spark/sql/catalyst/catalog/interface.scala     |  6 +++--
 .../spark/sql/catalyst/csv/CSVHeaderChecker.scala  | 25 ++++++++++++---------
 .../catalyst/expressions/V2ExpressionUtils.scala   | 10 +++++----
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  4 ++--
 .../ReplaceNullWithFalseInPredicate.scala          |  7 ++++--
 .../spark/sql/catalyst/optimizer/joins.scala       |  7 ++++--
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  8 ++++---
 .../spark/sql/catalyst/rules/RuleExecutor.scala    | 14 +++++++-----
 .../spark/sql/catalyst/util/CharVarcharUtils.scala | 11 ++++-----
 .../apache/spark/sql/catalyst/util/ParseMode.scala |  9 +++++---
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  | 10 ++++++---
 .../spark/sql/catalyst/util/StringUtils.scala      | 11 +++++----
 17 files changed, 133 insertions(+), 65 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index a0e99f1edc34..7fa0331515cb 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -22,6 +22,7 @@ package org.apache.spark.internal
  */
 object LogKey extends Enumeration {
   val ACCUMULATOR_ID = Value
+  val ANALYSIS_ERROR = Value
   val APP_DESC = Value
   val APP_ID = Value
   val APP_STATE = Value
@@ -33,6 +34,10 @@ object LogKey extends Enumeration {
   val CATEGORICAL_FEATURES = Value
   val CLASS_LOADER = Value
   val CLASS_NAME = Value
+  val COLUMN_DATA_TYPE_SOURCE = Value
+  val COLUMN_DATA_TYPE_TARGET = Value
+  val COLUMN_DEFAULT_VALUE = Value
+  val COLUMN_NAME = Value
   val COMMAND = Value
   val COMMAND_OUTPUT = Value
   val COMPONENT = Value
@@ -40,6 +45,12 @@ object LogKey extends Enumeration {
   val CONFIG2 = Value
   val CONTAINER_ID = Value
   val COUNT = Value
+  val CSV_HEADER_COLUMN_NAME = Value
+  val CSV_HEADER_COLUMN_NAMES = Value
+  val CSV_HEADER_LENGTH = Value
+  val CSV_SCHEMA_FIELD_NAME = Value
+  val CSV_SCHEMA_FIELD_NAMES = Value
+  val CSV_SOURCE = Value
   val DRIVER_ID = Value
   val END_POINT = Value
   val ERROR = Value
@@ -48,13 +59,17 @@ object LogKey extends Enumeration {
   val EXECUTOR_ID = Value
   val EXECUTOR_STATE = Value
   val EXIT_CODE = Value
+  val EXPRESSION_TERMS = Value
   val FAILURES = Value
+  val FUNCTION_NAME = Value
+  val FUNCTION_PARAMETER = Value
   val GROUP_ID = Value
   val HIVE_OPERATION_STATE = Value
   val HIVE_OPERATION_TYPE = Value
   val HOST = Value
   val JOB_ID = Value
   val JOIN_CONDITION = Value
+  val JOIN_CONDITION_SUB_EXPRESSION = Value
   val LEARNING_RATE = Value
   val LINE = Value
   val LINE_NUM = Value
@@ -68,21 +83,28 @@ object LogKey extends Enumeration {
   val MERGE_DIR_NAME = Value
   val METHOD_NAME = Value
   val MIN_SIZE = Value
+  val NUM_COLUMNS = Value
   val NUM_ITERATIONS = Value
   val OBJECT_ID = Value
   val OLD_BLOCK_MANAGER_ID = Value
   val OPTIMIZER_CLASS_NAME = Value
   val OP_TYPE = Value
+  val PARSE_MODE = Value
   val PARTITION_ID = Value
+  val PARTITION_SPECIFICATION = Value
   val PATH = Value
   val PATHS = Value
   val POD_ID = Value
   val PORT = Value
+  val QUERY_HINT = Value
   val QUERY_PLAN = Value
+  val QUERY_PLAN_LENGTH_ACTUAL = Value
+  val QUERY_PLAN_LENGTH_MAX = Value
   val RANGE = Value
   val RDD_ID = Value
   val REASON = Value
   val REDUCE_ID = Value
+  val RELATION_NAME = Value
   val REMOTE_ADDRESS = Value
   val RETRY_COUNT = Value
   val RETRY_INTERVAL = Value
@@ -97,6 +119,7 @@ object LogKey extends Enumeration {
   val SHUFFLE_MERGE_ID = Value
   val SIZE = Value
   val SLEEP_TIME = Value
+  val SQL_TEXT = Value
   val STAGE_ID = Value
   val STATEMENT_ID = Value
   val SUBMISSION_ID = Value
@@ -110,8 +133,11 @@ object LogKey extends Enumeration {
   val THREAD_NAME = Value
   val TID = Value
   val TIMEOUT = Value
+  val TIME_UNITS = Value
   val TOTAL_EFFECTIVE_TIME = Value
   val TOTAL_TIME = Value
+  val UNSUPPORTED_EXPRESSION = Value
+  val UNSUPPORTED_HINT_REASON = Value
   val URI = Value
   val USER_ID = Value
   val USER_NAME = Value
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index bbc063c32103..99ae3adde44f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -24,7 +24,8 @@ import scala.collection.mutable
 import scala.reflect.ClassTag
 
 import org.apache.spark.SparkUnsupportedOperationException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.FUNCTION_NAME
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.expressions._
@@ -223,7 +224,8 @@ trait SimpleFunctionRegistryBase[T] extends 
FunctionRegistryBase[T] with Logging
     val newFunction = (info, builder)
     functionBuilders.put(name, newFunction) match {
       case Some(previousFunction) if previousFunction != newFunction =>
-        logWarning(s"The function $name replaced a previously registered 
function.")
+        logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " +
+          log"previously registered function.")
       case _ =>
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala
index 0287bb3d819f..7338ef21a713 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{QUERY_HINT, RELATION_NAME, 
UNSUPPORTED_HINT_REASON}
 import org.apache.spark.sql.catalyst.plans.logical.{HintErrorHandler, HintInfo}
 
 /**
@@ -27,27 +28,31 @@ object HintErrorLogger extends HintErrorHandler with 
Logging {
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
   override def hintNotRecognized(name: String, parameters: Seq[Any]): Unit = {
-    logWarning(s"Unrecognized hint: ${hintToPrettyString(name, parameters)}")
+    logWarning(log"Unrecognized hint: " +
+      log"${MDC(QUERY_HINT, hintToPrettyString(name, parameters))}")
   }
 
   override def hintRelationsNotFound(
       name: String, parameters: Seq[Any], invalidRelations: Set[Seq[String]]): 
Unit = {
     invalidRelations.foreach { ident =>
-      logWarning(s"Count not find relation '${ident.quoted}' specified in hint 
" +
-        s"'${hintToPrettyString(name, parameters)}'.")
+      logWarning(log"Count not find relation '${MDC(RELATION_NAME, 
ident.quoted)}' " +
+        log"specified in hint '${MDC(QUERY_HINT, hintToPrettyString(name, 
parameters))}'.")
     }
   }
 
   override def joinNotFoundForJoinHint(hint: HintInfo): Unit = {
-    logWarning(s"A join hint $hint is specified but it is not part of a join 
relation.")
+    logWarning(log"A join hint ${MDC(QUERY_HINT, hint)} is specified " +
+      log"but it is not part of a join relation.")
   }
 
   override def joinHintNotSupported(hint: HintInfo, reason: String): Unit = {
-    logWarning(s"Hint $hint is not supported in the query: $reason.")
+    logWarning(log"Hint ${MDC(QUERY_HINT, hint)} is not supported in the 
query: " +
+      log"${MDC(UNSUPPORTED_HINT_REASON, reason)}.")
   }
 
   override def hintOverridden(hint: HintInfo): Unit = {
-    logWarning(s"Hint $hint is overridden by another hint and will not take 
effect.")
+    logWarning(log"Hint ${MDC(QUERY_HINT, hint)} is overridden by another hint 
" +
+      log"and will not take effect.")
   }
 
   private def hintToPrettyString(name: String, parameters: Seq[Any]): String = 
{
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
index cb836550359c..e9c4dd0be7d9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
@@ -87,7 +87,8 @@ object StreamingJoinHelper extends PredicateHelper with 
Logging {
           l, r, attributesToFindStateWatermarkFor, 
attributesWithEventWatermark, eventWatermark)
       } catch {
         case NonFatal(e) =>
-          logWarning(s"Error trying to extract state constraint from condition 
$joinCondition", e)
+          logWarning(log"Error trying to extract state constraint from 
condition " +
+            log"${MDC(JOIN_CONDITION, joinCondition)}", e)
           None
       }
     }
@@ -165,8 +166,9 @@ object StreamingJoinHelper extends PredicateHelper with 
Logging {
 
     // Verify there is only one correct constraint term and of the correct type
     if (constraintTerms.size > 1) {
-      logWarning("Failed to extract state constraint terms: multiple time 
terms in condition\n\t" +
-        terms.mkString("\n\t"))
+      logWarning(
+        log"Failed to extract state constraint terms: multiple time terms in 
condition\n\t" +
+          log"${MDC(EXPRESSION_TERMS, terms.mkString("\n\t"))}")
       return None
     }
     if (constraintTerms.isEmpty) {
@@ -263,9 +265,10 @@ object StreamingJoinHelper extends PredicateHelper with 
Logging {
               if (calendarInterval.months != 0) {
                 invalid = true
                 logWarning(
-                  s"Failed to extract state value watermark from condition 
$exprToCollectFrom " +
-                    s"as imprecise intervals like months and years cannot be 
used for" +
-                    s"watermark calculation. Use interval in terms of day 
instead.")
+                  log"Failed to extract state value watermark from condition " 
+
+                    log"${MDC(JOIN_CONDITION, exprToCollectFrom)} " +
+                    log"as imprecise intervals like months and years cannot be 
used for" +
+                    log"watermark calculation. Use interval in terms of day 
instead.")
                 Literal(0.0)
               } else {
                 Literal(calendarInterval.days * MICROS_PER_DAY.toDouble +
@@ -284,7 +287,9 @@ object StreamingJoinHelper extends PredicateHelper with 
Logging {
           Seq(negateIfNeeded(castedLit, negate))
         case a @ _ =>
           logWarning(
-            s"Failed to extract state value watermark from condition 
$exprToCollectFrom due to $a")
+            log"Failed to extract state value watermark from condition " +
+              log"${MDC(JOIN_CONDITION, exprToCollectFrom)} due to " +
+              log"${MDC(JOIN_CONDITION_SUB_EXPRESSION, a)}")
           invalid = true
           Seq.empty
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index d57464fcefc0..e39ec267fa61 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ANALYSIS_ERROR, QUERY_PLAN}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.ExtendedAnalysisException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, CurrentDate, CurrentTimestampLike, Expression, 
GroupingSets, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, 
WindowExpression}
@@ -133,7 +134,8 @@ object UnsupportedOperationChecker extends Logging {
         }
       }
     } catch {
-      case e: AnalysisException if !failWhenDetected => 
logWarning(s"${e.message};\n$plan")
+      case e: AnalysisException if !failWhenDetected =>
+        logWarning(log"${MDC(ANALYSIS_ERROR, e.message)};\n${MDC(QUERY_PLAN, 
plan)}", e)
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 10428877ba8d..4807d886c9f9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -32,7 +32,8 @@ import org.json4s.JsonAST.{JArray, JString}
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, 
InternalRow, SQLConfHelper, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
Resolver, UnresolvedLeafNode}
@@ -832,7 +833,8 @@ object CatalogColumnStat extends Logging {
       ))
     } catch {
       case NonFatal(e) =>
-        logWarning(s"Failed to parse column statistics for column ${colName} 
in table $table", e)
+        logWarning(log"Failed to parse column statistics for column " +
+          log"${MDC(COLUMN_NAME, colName)} in table ${MDC(RELATION_NAME, 
table)}", e)
         None
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
index 1fd264f2c3df..4e0985af6b60 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
@@ -21,7 +21,8 @@ import com.univocity.parsers.common.AbstractParser
 import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
 
 import org.apache.spark.SparkIllegalArgumentException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKey.{CSV_HEADER_COLUMN_NAME, 
CSV_HEADER_COLUMN_NAMES, CSV_HEADER_LENGTH, CSV_SCHEMA_FIELD_NAME, 
CSV_SCHEMA_FIELD_NAMES, CSV_SOURCE, NUM_COLUMNS}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
@@ -61,7 +62,7 @@ class CSVHeaderChecker(
     if (columnNames != null) {
       val fieldNames = schema.map(_.name).toIndexedSeq
       val (headerLen, schemaSize) = (columnNames.length, fieldNames.length)
-      var errorMessage: Option[String] = None
+      var errorMessage: Option[MessageWithContext] = None
 
       if (headerLen == schemaSize) {
         var i = 0
@@ -75,19 +76,21 @@ class CSVHeaderChecker(
           }
           if (nameInHeader != nameInSchema) {
             errorMessage = Some(
-              s"""|CSV header does not conform to the schema.
-                  | Header: ${columnNames.mkString(", ")}
-                  | Schema: ${fieldNames.mkString(", ")}
-                  |Expected: ${fieldNames(i)} but found: ${columnNames(i)}
-                  |$source""".stripMargin)
+              log"""|CSV header does not conform to the schema.
+                    | Header: ${MDC(CSV_HEADER_COLUMN_NAMES, 
columnNames.mkString(", "))}
+                    | Schema: ${MDC(CSV_SCHEMA_FIELD_NAMES, 
fieldNames.mkString(", "))}
+                    |Expected: ${MDC(CSV_SCHEMA_FIELD_NAME, fieldNames(i))}
+                    |but found: ${MDC(CSV_HEADER_COLUMN_NAME, columnNames(i))}
+                    |${MDC(CSV_SOURCE, source)}""".stripMargin)
           }
           i += 1
         }
       } else {
         errorMessage = Some(
-          s"""|Number of column in CSV header is not equal to number of fields 
in the schema:
-              | Header length: $headerLen, schema size: $schemaSize
-              |$source""".stripMargin)
+          log"""|Number of column in CSV header is not equal to number of 
fields in the schema:
+                | Header length: ${MDC(CSV_HEADER_LENGTH, headerLen)},
+                | schema size: ${MDC(NUM_COLUMNS, schemaSize)}
+                |${MDC(CSV_SOURCE, source)}""".stripMargin)
       }
 
       errorMessage.foreach { msg =>
@@ -96,7 +99,7 @@ class CSVHeaderChecker(
         } else {
           throw new SparkIllegalArgumentException(
             errorClass = "_LEGACY_ERROR_TEMP_3241",
-            messageParameters = Map("msg" -> msg))
+            messageParameters = Map("msg" -> msg.message))
         }
       }
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
index 621e01eedea7..c6cfccb74c16 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.lang.reflect.{Method, Modifier}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{FUNCTION_NAME, FUNCTION_PARAMETER}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
@@ -134,9 +135,10 @@ object V2ExpressionUtils extends SQLConfHelper with 
Logging {
     } catch {
       case _: NoSuchFunctionException =>
         val parameterString = args.map(_.dataType.typeName).mkString("(", ", 
", ")")
-        logWarning(s"V2 function $name with parameter types $parameterString 
is used in " +
-            "partition transforms, but its definition couldn't be found in the 
function catalog " +
-            "provided")
+        logWarning(log"V2 function ${MDC(FUNCTION_NAME, name)} " +
+          log"with parameter types ${MDC(FUNCTION_PARAMETER, parameterString)} 
is used in " +
+          log"partition transforms, but its definition couldn't be found in 
the function catalog " +
+          log"provided")
         None
       case _: UnsupportedOperationException =>
         None
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0e31595f919d..3a4002127df1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -445,8 +445,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
     val excludedRules = excludedRulesConf.filter { ruleName =>
       val nonExcludable = nonExcludableRules.contains(ruleName)
       if (nonExcludable) {
-        logWarning(s"Optimization rule '${ruleName}' was not excluded from the 
optimizer " +
-          s"because this rule is a non-excludable rule.")
+        logWarning(log"Optimization rule '${MDC(RULE_NAME, ruleName)}' " +
+          log"was not excluded from the optimizer because this rule is a 
non-excludable rule.")
       }
       !nonExcludable
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
index 73972cae82de..772382f5f1e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.internal.LogKey.{SQL_TEXT, UNSUPPORTED_EXPRESSION}
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, 
ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet, 
LambdaFunction, Literal, MapFilter, Not, Or}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, 
DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, 
MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction, 
UpdateTable, WriteDelta}
@@ -138,8 +140,9 @@ object ReplaceNullWithFalseInPredicate extends 
Rule[LogicalPlan] {
             "dataType" -> e.dataType.catalogString,
             "expr" -> e.sql))
       } else {
-        val message = "Expected a Boolean type expression in 
replaceNullWithFalse, " +
-          s"but got the type `${e.dataType.catalogString}` in `${e.sql}`."
+        val message = log"Expected a Boolean type expression in 
replaceNullWithFalse, " +
+          log"but got the type `${MDC(UNSUPPORTED_EXPRESSION, 
e.dataType.catalogString)}` " +
+          log"in `${MDC(SQL_TEXT, e.sql)}`."
         logWarning(message)
         e
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 8f03b93dce70..655b7c3455b1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.optimizer
 import scala.annotation.tailrec
 import scala.util.control.NonFatal
 
+import org.apache.spark.internal.LogKey.JOIN_CONDITION
+import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
ExtractFiltersAndInnerJoins}
@@ -262,8 +264,9 @@ object ExtractPythonUDFFromJoinCondition extends 
Rule[LogicalPlan] with Predicat
       // the new join conditions.
       val (udf, rest) = 
splitConjunctivePredicates(cond).partition(hasUnevaluablePythonUDF(_, j))
       val newCondition = if (rest.isEmpty) {
-        logWarning(s"The join condition:$cond of the join plan contains 
PythonUDF only," +
-          s" it will be moved out and the join plan will be turned to cross 
join.")
+        logWarning(log"The join condition:${MDC(JOIN_CONDITION, cond)} " +
+          log"of the join plan contains PythonUDF only," +
+          log" it will be moved out and the join plan will be turned to cross 
join.")
         None
       } else {
         Some(rest.reduceLeft(And))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index b2ae8c0d757f..b4ba2c1caa22 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -31,7 +31,8 @@ import org.apache.commons.codec.DecoderException
 import org.apache.commons.codec.binary.Hex
 
 import org.apache.spark.{SparkArithmeticException, SparkException, 
SparkIllegalArgumentException, SparkThrowable}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PARTITION_SPECIFICATION
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, ClusterBySpec}
@@ -4599,8 +4600,9 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
     def checkPartitionSpec(): Unit = {
       if (ctx.partitionSpec != null) {
-        logWarning("Partition specification is ignored when collecting column 
statistics: " +
-          ctx.partitionSpec.getText)
+        logWarning(
+          log"Partition specification is ignored when collecting column 
statistics: " +
+            log"${MDC(PARTITION_SPECIFICATION, ctx.partitionSpec.getText)}")
       }
     }
     if (ctx.identifier != null &&
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 566c48ff41b6..476ace2662f8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -257,16 +257,18 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] 
extends Logging {
           // Only log if this is a rule that is supposed to run more than once.
           if (iteration != 2) {
             val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
-              "."
+              log"."
             } else {
-              s", please set '${batch.strategy.maxIterationsSetting}' to a 
larger value."
+              log", please set '${MDC(NUM_ITERATIONS, 
batch.strategy.maxIterationsSetting)}' " +
+                log"to a larger value."
             }
-            val message = s"Max iterations (${iteration - 1}) reached for 
batch ${batch.name}" +
-              s"$endingMsg"
+            val log = log"Max iterations (${MDC(NUM_ITERATIONS, iteration - 
1)}) " +
+              log"reached for batch ${MDC(RULE_BATCH_NAME, batch.name)}" +
+              endingMsg
             if (Utils.isTesting || batch.strategy.errorOnExceed) {
-              throw new RuntimeException(message)
+              throw new RuntimeException(log.message)
             } else {
-              logWarning(message)
+              logWarning(log)
             }
           }
           // Check idempotence for Once batches.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
index 0e7bffcc3f95..06a88b5d7b51 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.util
 
 import scala.collection.mutable
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -74,10 +75,10 @@ object CharVarcharUtils extends Logging with 
SparkCharVarcharUtils {
     if (SQLConf.get.charVarcharAsString) {
       replaceCharVarcharWithString(dt)
     } else if (hasCharVarchar(dt)) {
-      logWarning("The Spark cast operator does not support char/varchar type 
and simply treats" +
-        " them as string type. Please use string type directly to avoid 
confusion. Otherwise," +
-        s" you can set ${SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key} to true, 
so that Spark treat" +
-        s" them as string type as same as Spark 3.0 and earlier")
+      logWarning(log"The Spark cast operator does not support char/varchar 
type and simply treats" +
+        log" them as string type. Please use string type directly to avoid 
confusion. Otherwise," +
+        log" you can set ${MDC(CONFIG, 
SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key)} " +
+        log"to true, so that Spark treat them as string type as same as Spark 
3.0 and earlier")
       replaceCharVarcharWithString(dt)
     } else {
       dt
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
index b35da8e2c80f..dd1e466d1b38 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.util
 
 import java.util.Locale
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PARSE_MODE
 
 sealed trait ParseMode {
   /**
@@ -53,11 +54,13 @@ object ParseMode extends Logging {
       case DropMalformedMode.name => DropMalformedMode
       case FailFastMode.name => FailFastMode
       case _ =>
-        logWarning(s"$v is not a valid parse mode. Using 
${PermissiveMode.name}.")
+        logWarning(log"${MDC(PARSE_MODE, v)} is not a valid parse mode. " +
+          log"Using ${MDC(PARSE_MODE, PermissiveMode.name)}.")
         PermissiveMode
     }
   }.getOrElse {
-    logWarning(s"mode is null and not a valid parse mode. Using 
${PermissiveMode.name}.")
+    logWarning(log"mode is null and not a valid parse mode. " +
+      log"Using ${MDC(PARSE_MODE, PermissiveMode.name)}.")
     PermissiveMode
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 84bbf97886c8..db9adef8ef3b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.util
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis._
@@ -321,8 +322,11 @@ object ResolveDefaultColumns extends QueryErrorsBase
           Option(casted.eval(EmptyRow)).map(Literal(_, targetType))
         } catch {
           case e @ ( _: SparkThrowable | _: RuntimeException) =>
-            logWarning(s"Failed to cast default value '$l' for column $colName 
from " +
-              s"${l.dataType} to $targetType due to ${e.getMessage}")
+            logWarning(log"Failed to cast default value 
'${MDC(COLUMN_DEFAULT_VALUE, l)}' " +
+              log"for column ${MDC(COLUMN_NAME, colName)} " +
+              log"from ${MDC(COLUMN_DATA_TYPE_SOURCE, l.dataType)} " +
+              log"to ${MDC(COLUMN_DATA_TYPE_TARGET, targetType)} " +
+              log"due to ${MDC(ERROR, e.getMessage)}", e)
             None
         }
       case _ => None
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index f94a0650cce4..04df3635d475 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -21,7 +21,8 @@ import java.util.regex.{Pattern, PatternSyntaxException}
 
 import org.apache.commons.text.similarity.LevenshteinDistance
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
@@ -136,9 +137,11 @@ object StringUtils extends Logging {
     override def toString: String = {
       if (atLimit) {
         logWarning(
-          "Truncated the string representation of a plan since it was too 
long. The " +
-            s"plan had length ${length} and the maximum is ${maxLength}. This 
behavior " +
-            s"can be adjusted by setting 
'${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.")
+          log"Truncated the string representation of a plan since it was too 
long. The " +
+            log"plan had length ${MDC(QUERY_PLAN_LENGTH_ACTUAL, length)} " +
+            log"and the maximum is ${MDC(QUERY_PLAN_LENGTH_MAX, maxLength)}. 
This behavior " +
+            log"can be adjusted by setting " +
+            log"'${MDC(CONFIG, SQLConf.MAX_PLAN_STRING_LENGTH.key)}'.")
         val truncateMsg = if (maxLength == 0) {
           s"Truncated plan of $length characters"
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to