This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 149ac0f8893b [SPARK-47581][CORE] SQL catalyst: Migrate logWarning with
variables to structured logging framework
149ac0f8893b is described below
commit 149ac0f8893b5be8b8b0556ef47a2384aaad1850
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Mon Apr 8 22:56:10 2024 -0700
[SPARK-47581][CORE] SQL catalyst: Migrate logWarning with variables to
structured logging framework
### What changes were proposed in this pull request?
Migrate logWarning with variables of the Catalyst module to structured
logging framework. This transforms the logWarning entries of the following API
```
def logWarning(msg: => String): Unit
```
to
```
def logWarning(entry: LogEntry): Unit
```
### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.
### Does this PR introduce _any_ user-facing change?
Yes, Spark core logs will contain additional MDC
### How was this patch tested?
Compiler and scala style checks, as well as code review.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45904 from dtenedor/log-warn-catalyst.
Lead-authored-by: Daniel Tenedorio <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../scala/org/apache/spark/internal/LogKey.scala | 26 ++++++++++++++++++++++
.../sql/catalyst/analysis/FunctionRegistry.scala | 6 +++--
.../sql/catalyst/analysis/HintErrorLogger.scala | 19 ++++++++++------
.../catalyst/analysis/StreamingJoinHelper.scala | 19 ++++++++++------
.../analysis/UnsupportedOperationChecker.scala | 6 +++--
.../spark/sql/catalyst/catalog/interface.scala | 6 +++--
.../spark/sql/catalyst/csv/CSVHeaderChecker.scala | 25 ++++++++++++---------
.../catalyst/expressions/V2ExpressionUtils.scala | 10 +++++----
.../spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++--
.../ReplaceNullWithFalseInPredicate.scala | 7 ++++--
.../spark/sql/catalyst/optimizer/joins.scala | 7 ++++--
.../spark/sql/catalyst/parser/AstBuilder.scala | 8 ++++---
.../spark/sql/catalyst/rules/RuleExecutor.scala | 14 +++++++-----
.../spark/sql/catalyst/util/CharVarcharUtils.scala | 11 ++++-----
.../apache/spark/sql/catalyst/util/ParseMode.scala | 9 +++++---
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 10 ++++++---
.../spark/sql/catalyst/util/StringUtils.scala | 11 +++++----
17 files changed, 133 insertions(+), 65 deletions(-)
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index a0e99f1edc34..7fa0331515cb 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -22,6 +22,7 @@ package org.apache.spark.internal
*/
object LogKey extends Enumeration {
val ACCUMULATOR_ID = Value
+ val ANALYSIS_ERROR = Value
val APP_DESC = Value
val APP_ID = Value
val APP_STATE = Value
@@ -33,6 +34,10 @@ object LogKey extends Enumeration {
val CATEGORICAL_FEATURES = Value
val CLASS_LOADER = Value
val CLASS_NAME = Value
+ val COLUMN_DATA_TYPE_SOURCE = Value
+ val COLUMN_DATA_TYPE_TARGET = Value
+ val COLUMN_DEFAULT_VALUE = Value
+ val COLUMN_NAME = Value
val COMMAND = Value
val COMMAND_OUTPUT = Value
val COMPONENT = Value
@@ -40,6 +45,12 @@ object LogKey extends Enumeration {
val CONFIG2 = Value
val CONTAINER_ID = Value
val COUNT = Value
+ val CSV_HEADER_COLUMN_NAME = Value
+ val CSV_HEADER_COLUMN_NAMES = Value
+ val CSV_HEADER_LENGTH = Value
+ val CSV_SCHEMA_FIELD_NAME = Value
+ val CSV_SCHEMA_FIELD_NAMES = Value
+ val CSV_SOURCE = Value
val DRIVER_ID = Value
val END_POINT = Value
val ERROR = Value
@@ -48,13 +59,17 @@ object LogKey extends Enumeration {
val EXECUTOR_ID = Value
val EXECUTOR_STATE = Value
val EXIT_CODE = Value
+ val EXPRESSION_TERMS = Value
val FAILURES = Value
+ val FUNCTION_NAME = Value
+ val FUNCTION_PARAMETER = Value
val GROUP_ID = Value
val HIVE_OPERATION_STATE = Value
val HIVE_OPERATION_TYPE = Value
val HOST = Value
val JOB_ID = Value
val JOIN_CONDITION = Value
+ val JOIN_CONDITION_SUB_EXPRESSION = Value
val LEARNING_RATE = Value
val LINE = Value
val LINE_NUM = Value
@@ -68,21 +83,28 @@ object LogKey extends Enumeration {
val MERGE_DIR_NAME = Value
val METHOD_NAME = Value
val MIN_SIZE = Value
+ val NUM_COLUMNS = Value
val NUM_ITERATIONS = Value
val OBJECT_ID = Value
val OLD_BLOCK_MANAGER_ID = Value
val OPTIMIZER_CLASS_NAME = Value
val OP_TYPE = Value
+ val PARSE_MODE = Value
val PARTITION_ID = Value
+ val PARTITION_SPECIFICATION = Value
val PATH = Value
val PATHS = Value
val POD_ID = Value
val PORT = Value
+ val QUERY_HINT = Value
val QUERY_PLAN = Value
+ val QUERY_PLAN_LENGTH_ACTUAL = Value
+ val QUERY_PLAN_LENGTH_MAX = Value
val RANGE = Value
val RDD_ID = Value
val REASON = Value
val REDUCE_ID = Value
+ val RELATION_NAME = Value
val REMOTE_ADDRESS = Value
val RETRY_COUNT = Value
val RETRY_INTERVAL = Value
@@ -97,6 +119,7 @@ object LogKey extends Enumeration {
val SHUFFLE_MERGE_ID = Value
val SIZE = Value
val SLEEP_TIME = Value
+ val SQL_TEXT = Value
val STAGE_ID = Value
val STATEMENT_ID = Value
val SUBMISSION_ID = Value
@@ -110,8 +133,11 @@ object LogKey extends Enumeration {
val THREAD_NAME = Value
val TID = Value
val TIMEOUT = Value
+ val TIME_UNITS = Value
val TOTAL_EFFECTIVE_TIME = Value
val TOTAL_TIME = Value
+ val UNSUPPORTED_EXPRESSION = Value
+ val UNSUPPORTED_HINT_REASON = Value
val URI = Value
val USER_ID = Value
val USER_NAME = Value
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index bbc063c32103..99ae3adde44f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -24,7 +24,8 @@ import scala.collection.mutable
import scala.reflect.ClassTag
import org.apache.spark.SparkUnsupportedOperationException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.FUNCTION_NAME
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions._
@@ -223,7 +224,8 @@ trait SimpleFunctionRegistryBase[T] extends
FunctionRegistryBase[T] with Logging
val newFunction = (info, builder)
functionBuilders.put(name, newFunction) match {
case Some(previousFunction) if previousFunction != newFunction =>
- logWarning(s"The function $name replaced a previously registered
function.")
+ logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " +
+ log"previously registered function.")
case _ =>
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala
index 0287bb3d819f..7338ef21a713 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HintErrorLogger.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{QUERY_HINT, RELATION_NAME,
UNSUPPORTED_HINT_REASON}
import org.apache.spark.sql.catalyst.plans.logical.{HintErrorHandler, HintInfo}
/**
@@ -27,27 +28,31 @@ object HintErrorLogger extends HintErrorHandler with
Logging {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
override def hintNotRecognized(name: String, parameters: Seq[Any]): Unit = {
- logWarning(s"Unrecognized hint: ${hintToPrettyString(name, parameters)}")
+ logWarning(log"Unrecognized hint: " +
+ log"${MDC(QUERY_HINT, hintToPrettyString(name, parameters))}")
}
override def hintRelationsNotFound(
name: String, parameters: Seq[Any], invalidRelations: Set[Seq[String]]):
Unit = {
invalidRelations.foreach { ident =>
- logWarning(s"Count not find relation '${ident.quoted}' specified in hint
" +
- s"'${hintToPrettyString(name, parameters)}'.")
+ logWarning(log"Count not find relation '${MDC(RELATION_NAME,
ident.quoted)}' " +
+ log"specified in hint '${MDC(QUERY_HINT, hintToPrettyString(name,
parameters))}'.")
}
}
override def joinNotFoundForJoinHint(hint: HintInfo): Unit = {
- logWarning(s"A join hint $hint is specified but it is not part of a join
relation.")
+ logWarning(log"A join hint ${MDC(QUERY_HINT, hint)} is specified " +
+ log"but it is not part of a join relation.")
}
override def joinHintNotSupported(hint: HintInfo, reason: String): Unit = {
- logWarning(s"Hint $hint is not supported in the query: $reason.")
+ logWarning(log"Hint ${MDC(QUERY_HINT, hint)} is not supported in the
query: " +
+ log"${MDC(UNSUPPORTED_HINT_REASON, reason)}.")
}
override def hintOverridden(hint: HintInfo): Unit = {
- logWarning(s"Hint $hint is overridden by another hint and will not take
effect.")
+ logWarning(log"Hint ${MDC(QUERY_HINT, hint)} is overridden by another hint
" +
+ log"and will not take effect.")
}
private def hintToPrettyString(name: String, parameters: Seq[Any]): String =
{
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
index cb836550359c..e9c4dd0be7d9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala
@@ -87,7 +87,8 @@ object StreamingJoinHelper extends PredicateHelper with
Logging {
l, r, attributesToFindStateWatermarkFor,
attributesWithEventWatermark, eventWatermark)
} catch {
case NonFatal(e) =>
- logWarning(s"Error trying to extract state constraint from condition
$joinCondition", e)
+ logWarning(log"Error trying to extract state constraint from
condition " +
+ log"${MDC(JOIN_CONDITION, joinCondition)}", e)
None
}
}
@@ -165,8 +166,9 @@ object StreamingJoinHelper extends PredicateHelper with
Logging {
// Verify there is only one correct constraint term and of the correct type
if (constraintTerms.size > 1) {
- logWarning("Failed to extract state constraint terms: multiple time
terms in condition\n\t" +
- terms.mkString("\n\t"))
+ logWarning(
+ log"Failed to extract state constraint terms: multiple time terms in
condition\n\t" +
+ log"${MDC(EXPRESSION_TERMS, terms.mkString("\n\t"))}")
return None
}
if (constraintTerms.isEmpty) {
@@ -263,9 +265,10 @@ object StreamingJoinHelper extends PredicateHelper with
Logging {
if (calendarInterval.months != 0) {
invalid = true
logWarning(
- s"Failed to extract state value watermark from condition
$exprToCollectFrom " +
- s"as imprecise intervals like months and years cannot be
used for" +
- s"watermark calculation. Use interval in terms of day
instead.")
+ log"Failed to extract state value watermark from condition "
+
+ log"${MDC(JOIN_CONDITION, exprToCollectFrom)} " +
+ log"as imprecise intervals like months and years cannot be
used for" +
+ log"watermark calculation. Use interval in terms of day
instead.")
Literal(0.0)
} else {
Literal(calendarInterval.days * MICROS_PER_DAY.toDouble +
@@ -284,7 +287,9 @@ object StreamingJoinHelper extends PredicateHelper with
Logging {
Seq(negateIfNeeded(castedLit, negate))
case a @ _ =>
logWarning(
- s"Failed to extract state value watermark from condition
$exprToCollectFrom due to $a")
+ log"Failed to extract state value watermark from condition " +
+ log"${MDC(JOIN_CONDITION, exprToCollectFrom)} due to " +
+ log"${MDC(JOIN_CONDITION_SUB_EXPRESSION, a)}")
invalid = true
Seq.empty
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index d57464fcefc0..e39ec267fa61 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ANALYSIS_ERROR, QUERY_PLAN}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, CurrentDate, CurrentTimestampLike, Expression,
GroupingSets, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow,
WindowExpression}
@@ -133,7 +134,8 @@ object UnsupportedOperationChecker extends Logging {
}
}
} catch {
- case e: AnalysisException if !failWhenDetected =>
logWarning(s"${e.message};\n$plan")
+ case e: AnalysisException if !failWhenDetected =>
+ logWarning(log"${MDC(ANALYSIS_ERROR, e.message)};\n${MDC(QUERY_PLAN,
plan)}", e)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 10428877ba8d..4807d886c9f9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -32,7 +32,8 @@ import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods._
import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier,
InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation,
Resolver, UnresolvedLeafNode}
@@ -832,7 +833,8 @@ object CatalogColumnStat extends Logging {
))
} catch {
case NonFatal(e) =>
- logWarning(s"Failed to parse column statistics for column ${colName}
in table $table", e)
+ logWarning(log"Failed to parse column statistics for column " +
+ log"${MDC(COLUMN_NAME, colName)} in table ${MDC(RELATION_NAME,
table)}", e)
None
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
index 1fd264f2c3df..4e0985af6b60 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
@@ -21,7 +21,8 @@ import com.univocity.parsers.common.AbstractParser
import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
import org.apache.spark.SparkIllegalArgumentException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKey.{CSV_HEADER_COLUMN_NAME,
CSV_HEADER_COLUMN_NAMES, CSV_HEADER_LENGTH, CSV_SCHEMA_FIELD_NAME,
CSV_SCHEMA_FIELD_NAMES, CSV_SOURCE, NUM_COLUMNS}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -61,7 +62,7 @@ class CSVHeaderChecker(
if (columnNames != null) {
val fieldNames = schema.map(_.name).toIndexedSeq
val (headerLen, schemaSize) = (columnNames.length, fieldNames.length)
- var errorMessage: Option[String] = None
+ var errorMessage: Option[MessageWithContext] = None
if (headerLen == schemaSize) {
var i = 0
@@ -75,19 +76,21 @@ class CSVHeaderChecker(
}
if (nameInHeader != nameInSchema) {
errorMessage = Some(
- s"""|CSV header does not conform to the schema.
- | Header: ${columnNames.mkString(", ")}
- | Schema: ${fieldNames.mkString(", ")}
- |Expected: ${fieldNames(i)} but found: ${columnNames(i)}
- |$source""".stripMargin)
+ log"""|CSV header does not conform to the schema.
+ | Header: ${MDC(CSV_HEADER_COLUMN_NAMES,
columnNames.mkString(", "))}
+ | Schema: ${MDC(CSV_SCHEMA_FIELD_NAMES,
fieldNames.mkString(", "))}
+ |Expected: ${MDC(CSV_SCHEMA_FIELD_NAME, fieldNames(i))}
+ |but found: ${MDC(CSV_HEADER_COLUMN_NAME, columnNames(i))}
+ |${MDC(CSV_SOURCE, source)}""".stripMargin)
}
i += 1
}
} else {
errorMessage = Some(
- s"""|Number of column in CSV header is not equal to number of fields
in the schema:
- | Header length: $headerLen, schema size: $schemaSize
- |$source""".stripMargin)
+ log"""|Number of column in CSV header is not equal to number of
fields in the schema:
+ | Header length: ${MDC(CSV_HEADER_LENGTH, headerLen)},
+ | schema size: ${MDC(NUM_COLUMNS, schemaSize)}
+ |${MDC(CSV_SOURCE, source)}""".stripMargin)
}
errorMessage.foreach { msg =>
@@ -96,7 +99,7 @@ class CSVHeaderChecker(
} else {
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3241",
- messageParameters = Map("msg" -> msg))
+ messageParameters = Map("msg" -> msg.message))
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
index 621e01eedea7..c6cfccb74c16 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
import java.lang.reflect.{Method, Modifier}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{FUNCTION_NAME, FUNCTION_PARAMETER}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
@@ -134,9 +135,10 @@ object V2ExpressionUtils extends SQLConfHelper with
Logging {
} catch {
case _: NoSuchFunctionException =>
val parameterString = args.map(_.dataType.typeName).mkString("(", ",
", ")")
- logWarning(s"V2 function $name with parameter types $parameterString
is used in " +
- "partition transforms, but its definition couldn't be found in the
function catalog " +
- "provided")
+ logWarning(log"V2 function ${MDC(FUNCTION_NAME, name)} " +
+ log"with parameter types ${MDC(FUNCTION_PARAMETER, parameterString)}
is used in " +
+ log"partition transforms, but its definition couldn't be found in
the function catalog " +
+ log"provided")
None
case _: UnsupportedOperationException =>
None
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0e31595f919d..3a4002127df1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -445,8 +445,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
val excludedRules = excludedRulesConf.filter { ruleName =>
val nonExcludable = nonExcludableRules.contains(ruleName)
if (nonExcludable) {
- logWarning(s"Optimization rule '${ruleName}' was not excluded from the
optimizer " +
- s"because this rule is a non-excludable rule.")
+ logWarning(log"Optimization rule '${MDC(RULE_NAME, ruleName)}' " +
+ log"was not excluded from the optimizer because this rule is a
non-excludable rule.")
}
!nonExcludable
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
index 73972cae82de..772382f5f1e1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.internal.LogKey.{SQL_TEXT, UNSUPPORTED_EXPRESSION}
+import org.apache.spark.internal.MDC
import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists,
ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet,
LambdaFunction, Literal, MapFilter, Not, Or}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction,
DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan,
MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction,
UpdateTable, WriteDelta}
@@ -138,8 +140,9 @@ object ReplaceNullWithFalseInPredicate extends
Rule[LogicalPlan] {
"dataType" -> e.dataType.catalogString,
"expr" -> e.sql))
} else {
- val message = "Expected a Boolean type expression in
replaceNullWithFalse, " +
- s"but got the type `${e.dataType.catalogString}` in `${e.sql}`."
+ val message = log"Expected a Boolean type expression in
replaceNullWithFalse, " +
+ log"but got the type `${MDC(UNSUPPORTED_EXPRESSION,
e.dataType.catalogString)}` " +
+ log"in `${MDC(SQL_TEXT, e.sql)}`."
logWarning(message)
e
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 8f03b93dce70..655b7c3455b1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.util.control.NonFatal
+import org.apache.spark.internal.LogKey.JOIN_CONDITION
+import org.apache.spark.internal.MDC
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys,
ExtractFiltersAndInnerJoins}
@@ -262,8 +264,9 @@ object ExtractPythonUDFFromJoinCondition extends
Rule[LogicalPlan] with Predicat
// the new join conditions.
val (udf, rest) =
splitConjunctivePredicates(cond).partition(hasUnevaluablePythonUDF(_, j))
val newCondition = if (rest.isEmpty) {
- logWarning(s"The join condition:$cond of the join plan contains
PythonUDF only," +
- s" it will be moved out and the join plan will be turned to cross
join.")
+ logWarning(log"The join condition:${MDC(JOIN_CONDITION, cond)} " +
+ log"of the join plan contains PythonUDF only," +
+ log" it will be moved out and the join plan will be turned to cross
join.")
None
} else {
Some(rest.reduceLeft(And))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index b2ae8c0d757f..b4ba2c1caa22 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -31,7 +31,8 @@ import org.apache.commons.codec.DecoderException
import org.apache.commons.codec.binary.Hex
import org.apache.spark.{SparkArithmeticException, SparkException,
SparkIllegalArgumentException, SparkThrowable}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PARTITION_SPECIFICATION
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec,
CatalogStorageFormat, ClusterBySpec}
@@ -4599,8 +4600,9 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan =
withOrigin(ctx) {
def checkPartitionSpec(): Unit = {
if (ctx.partitionSpec != null) {
- logWarning("Partition specification is ignored when collecting column
statistics: " +
- ctx.partitionSpec.getText)
+ logWarning(
+ log"Partition specification is ignored when collecting column
statistics: " +
+ log"${MDC(PARTITION_SPECIFICATION, ctx.partitionSpec.getText)}")
}
}
if (ctx.identifier != null &&
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 566c48ff41b6..476ace2662f8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -257,16 +257,18 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]]
extends Logging {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
- "."
+ log"."
} else {
- s", please set '${batch.strategy.maxIterationsSetting}' to a
larger value."
+ log", please set '${MDC(NUM_ITERATIONS,
batch.strategy.maxIterationsSetting)}' " +
+ log"to a larger value."
}
- val message = s"Max iterations (${iteration - 1}) reached for
batch ${batch.name}" +
- s"$endingMsg"
+ val log = log"Max iterations (${MDC(NUM_ITERATIONS, iteration -
1)}) " +
+ log"reached for batch ${MDC(RULE_BATCH_NAME, batch.name)}" +
+ endingMsg
if (Utils.isTesting || batch.strategy.errorOnExceed) {
- throw new RuntimeException(message)
+ throw new RuntimeException(log.message)
} else {
- logWarning(message)
+ logWarning(log)
}
}
// Check idempotence for Once batches.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
index 0e7bffcc3f95..06a88b5d7b51 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.util
import scala.collection.mutable
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -74,10 +75,10 @@ object CharVarcharUtils extends Logging with
SparkCharVarcharUtils {
if (SQLConf.get.charVarcharAsString) {
replaceCharVarcharWithString(dt)
} else if (hasCharVarchar(dt)) {
- logWarning("The Spark cast operator does not support char/varchar type
and simply treats" +
- " them as string type. Please use string type directly to avoid
confusion. Otherwise," +
- s" you can set ${SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key} to true,
so that Spark treat" +
- s" them as string type as same as Spark 3.0 and earlier")
+ logWarning(log"The Spark cast operator does not support char/varchar
type and simply treats" +
+ log" them as string type. Please use string type directly to avoid
confusion. Otherwise," +
+ log" you can set ${MDC(CONFIG,
SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key)} " +
+ log"to true, so that Spark treat them as string type as same as Spark
3.0 and earlier")
replaceCharVarcharWithString(dt)
} else {
dt
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
index b35da8e2c80f..dd1e466d1b38 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.util
import java.util.Locale
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PARSE_MODE
sealed trait ParseMode {
/**
@@ -53,11 +54,13 @@ object ParseMode extends Logging {
case DropMalformedMode.name => DropMalformedMode
case FailFastMode.name => FailFastMode
case _ =>
- logWarning(s"$v is not a valid parse mode. Using
${PermissiveMode.name}.")
+ logWarning(log"${MDC(PARSE_MODE, v)} is not a valid parse mode. " +
+ log"Using ${MDC(PARSE_MODE, PermissiveMode.name)}.")
PermissiveMode
}
}.getOrElse {
- logWarning(s"mode is null and not a valid parse mode. Using
${PermissiveMode.name}.")
+ logWarning(log"mode is null and not a valid parse mode. " +
+ log"Using ${MDC(PARSE_MODE, PermissiveMode.name)}.")
PermissiveMode
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 84bbf97886c8..db9adef8ef3b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.util
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis._
@@ -321,8 +322,11 @@ object ResolveDefaultColumns extends QueryErrorsBase
Option(casted.eval(EmptyRow)).map(Literal(_, targetType))
} catch {
case e @ ( _: SparkThrowable | _: RuntimeException) =>
- logWarning(s"Failed to cast default value '$l' for column $colName
from " +
- s"${l.dataType} to $targetType due to ${e.getMessage}")
+ logWarning(log"Failed to cast default value
'${MDC(COLUMN_DEFAULT_VALUE, l)}' " +
+ log"for column ${MDC(COLUMN_NAME, colName)} " +
+ log"from ${MDC(COLUMN_DATA_TYPE_SOURCE, l.dataType)} " +
+ log"to ${MDC(COLUMN_DATA_TYPE_TARGET, targetType)} " +
+ log"due to ${MDC(ERROR, e.getMessage)}", e)
None
}
case _ => None
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index f94a0650cce4..04df3635d475 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -21,7 +21,8 @@ import java.util.regex.{Pattern, PatternSyntaxException}
import org.apache.commons.text.similarity.LevenshteinDistance
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
@@ -136,9 +137,11 @@ object StringUtils extends Logging {
override def toString: String = {
if (atLimit) {
logWarning(
- "Truncated the string representation of a plan since it was too
long. The " +
- s"plan had length ${length} and the maximum is ${maxLength}. This
behavior " +
- s"can be adjusted by setting
'${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.")
+ log"Truncated the string representation of a plan since it was too
long. The " +
+ log"plan had length ${MDC(QUERY_PLAN_LENGTH_ACTUAL, length)} " +
+ log"and the maximum is ${MDC(QUERY_PLAN_LENGTH_MAX, maxLength)}.
This behavior " +
+ log"can be adjusted by setting " +
+ log"'${MDC(CONFIG, SQLConf.MAX_PLAN_STRING_LENGTH.key)}'.")
val truncateMsg = if (maxLength == 0) {
s"Truncated plan of $length characters"
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]