This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4957a40d6e6b [SPARK-47584][SQL] SQL core: Migrate logWarn with variables to structured logging framework 4957a40d6e6b is described below commit 4957a40d6e6bf68226c8047687e8f30c93adb8ce Author: panbingkun <panbing...@baidu.com> AuthorDate: Wed Apr 17 11:59:09 2024 -0700 [SPARK-47584][SQL] SQL core: Migrate logWarn with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to migrate `logWarning` in module `SQL core` with variables to `structured logging framework`. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46057 from panbingkun/SPARK-47584. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 65 +++++++++++++++++++++- .../catalyst/analysis/StreamingJoinHelper.scala | 4 +- .../ReplaceNullWithFalseInPredicate.scala | 4 +- .../main/scala/org/apache/spark/sql/Column.scala | 13 +++-- .../scala/org/apache/spark/sql/SparkSession.scala | 20 ++++--- .../spark/sql/api/python/PythonSQLUtils.scala | 7 ++- .../org/apache/spark/sql/api/r/SQLUtils.scala | 9 +-- .../catalyst/analysis/ResolveSessionCatalog.scala | 9 ++- .../apache/spark/sql/execution/ExistingRDD.scala | 12 ++-- .../spark/sql/execution/QueryExecution.scala | 6 +- .../spark/sql/execution/SparkStrategies.scala | 10 ++-- .../sql/execution/WholeStageCodegenExec.scala | 8 ++- .../adaptive/InsertAdaptiveSparkPlan.scala | 6 +- .../execution/command/AnalyzeTablesCommand.scala | 6 +- .../spark/sql/execution/command/CommandUtils.scala | 9 +-- .../spark/sql/execution/command/SetCommand.scala | 28 ++++++---- .../apache/spark/sql/execution/command/ddl.scala | 8 ++- .../datasources/BasicWriteStatsTracker.scala | 9 +-- .../sql/execution/datasources/DataSource.scala | 10 ++-- .../execution/datasources/DataSourceManager.scala | 6 +- .../sql/execution/datasources/FilePartition.scala | 11 ++-- .../sql/execution/datasources/FileScanRDD.scala | 8 ++- .../execution/datasources/FileStatusCache.scala | 14 +++-- .../execution/datasources/csv/CSVDataSource.scala | 9 +-- .../execution/datasources/jdbc/JDBCRelation.scala | 15 ++--- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 11 ++-- .../datasources/json/JsonOutputWriter.scala | 8 ++- .../sql/execution/datasources/orc/OrcUtils.scala | 5 +- .../datasources/parquet/ParquetFileFormat.scala | 13 +++-- .../datasources/parquet/ParquetUtils.scala | 9 +-- .../execution/datasources/v2/CacheTableExec.scala | 4 +- .../execution/datasources/v2/CreateIndexExec.scala | 5 +- .../datasources/v2/CreateNamespaceExec.scala | 5 +- .../execution/datasources/v2/CreateTableExec.scala | 5 +- .../datasources/v2/DataSourceV2Strategy.scala | 5 +- .../execution/datasources/v2/DropIndexExec.scala | 4 +- .../datasources/v2/FilePartitionReader.scala | 7 ++- .../sql/execution/datasources/v2/FileScan.scala | 7 ++- .../v2/V2ScanPartitioningAndOrdering.scala | 8 ++- .../ApplyInPandasWithStatePythonRunner.scala | 8 ++- .../python/AttachDistributedSequenceExec.scala | 5 +- .../streaming/AvailableNowDataStreamWrapper.scala | 15 ++--- .../streaming/CheckpointFileManager.scala | 24 ++++---- .../streaming/CompactibleFileStreamLog.scala | 5 +- .../sql/execution/streaming/FileStreamSink.scala | 9 +-- .../sql/execution/streaming/FileStreamSource.scala | 16 ++++-- .../execution/streaming/IncrementalExecution.scala | 9 +-- .../streaming/ManifestFileCommitProtocol.scala | 6 +- .../execution/streaming/MicroBatchExecution.scala | 24 ++++---- .../spark/sql/execution/streaming/OffsetSeq.scala | 15 +++-- .../sql/execution/streaming/ProgressReporter.scala | 19 ++++--- .../execution/streaming/ResolveWriteToStream.scala | 15 +++-- .../sql/execution/streaming/StreamExecution.scala | 8 ++- .../sql/execution/streaming/TimerStateImpl.scala | 11 ++-- .../sql/execution/streaming/TriggerExecutor.scala | 8 ++- .../continuous/ContinuousTextSocketSource.scala | 5 +- .../sources/TextSocketMicroBatchStream.scala | 5 +- .../state/HDFSBackedStateStoreProvider.scala | 25 +++++---- .../sql/execution/streaming/state/RocksDB.scala | 6 +- .../streaming/state/RocksDBFileManager.scala | 14 +++-- .../sql/execution/streaming/state/StateStore.scala | 12 ++-- .../state/SymmetricHashJoinStateManager.scala | 9 +-- .../sql/execution/ui/SQLAppStatusListener.scala | 9 +-- .../apache/spark/sql/internal/SharedState.scala | 8 ++- .../apache/spark/sql/jdbc/PostgresDialect.scala | 5 +- .../sql/streaming/StreamingQueryManager.scala | 7 ++- 66 files changed, 445 insertions(+), 259 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 838ef0355e3a..46e993410ffc 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -22,6 +22,8 @@ package org.apache.spark.internal */ object LogKey extends Enumeration { val ACCUMULATOR_ID = Value + val ACTUAL_NUM_FILES = Value + val ACTUAL_PARTITION_COLUMN = Value val ANALYSIS_ERROR = Value val APP_DESC = Value val APP_ID = Value @@ -32,15 +34,18 @@ object LogKey extends Enumeration { val BROADCAST_ID = Value val BUCKET = Value val BYTECODE_SIZE = Value + val CACHED_TABLE_PARTITION_METADATA_SIZE = Value val CACHE_AUTO_REMOVED_SIZE = Value val CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value val CACHE_UNTIL_LAST_PRODUCED_SIZE = Value + val CALL_SITE_LONG_FORM = Value val CATEGORICAL_FEATURES = Value val CLASS_LOADER = Value val CLASS_NAME = Value val CLUSTER_ID = Value val CODEC_LEVEL = Value val CODEC_NAME = Value + val CODEGEN_STAGE_ID = Value val COLUMN_DATA_TYPE_SOURCE = Value val COLUMN_DATA_TYPE_TARGET = Value val COLUMN_DEFAULT_VALUE = Value @@ -63,14 +68,25 @@ object LogKey extends Enumeration { val CSV_SCHEMA_FIELD_NAME = Value val CSV_SCHEMA_FIELD_NAMES = Value val CSV_SOURCE = Value + val CURRENT_PATH = Value val DATA = Value val DATABASE_NAME = Value val DATAFRAME_CACHE_ENTRY = Value val DATAFRAME_ID = Value + val DATA_SOURCE = Value + val DATA_SOURCES = Value + val DATA_SOURCE_PROVIDER = Value + val DEFAULT_ISOLATION_LEVEL = Value + val DEFAULT_VALUE = Value + val DELEGATE = Value val DESCRIPTION = Value + val DESIRED_PARTITIONS_SIZE = Value val DRIVER_ID = Value val DROPPED_PARTITIONS = Value val DURATION = Value + val ELAPSED_TIME = Value + val ENCODING = Value + val END_INDEX = Value val END_POINT = Value val ENGINE = Value val ERROR = Value @@ -78,22 +94,31 @@ object LogKey extends Enumeration { val EVENT_QUEUE = Value val EXECUTE_INFO = Value val EXECUTE_KEY = Value + val EXECUTION_PLAN_LEAVES = Value val EXECUTOR_ENV_REGEX = Value val EXECUTOR_ID = Value val EXECUTOR_IDS = Value val EXECUTOR_STATE = Value val EXIT_CODE = Value - val EXPRESSION_TERMS = Value + val EXPECTED_NUM_FILES = Value + val EXPECTED_PARTITION_COLUMN = Value + val EXPIRY_TIMESTAMP = Value + val EXPR = Value + val EXPR_TERMS = Value + val EXTENDED_EXPLAIN_GENERATOR = Value val FAILURES = Value val FALLBACK_VERSION = Value val FIELD_NAME = Value val FILE_FORMAT = Value val FILE_FORMAT2 = Value + val FILE_VERSION = Value + val FINISH_TRIGGER_DURATION = Value val FROM_OFFSET = Value val FUNCTION_NAME = Value val FUNCTION_PARAMETER = Value val GROUP_ID = Value val HADOOP_VERSION = Value + val HASH_JOIN_KEYS = Value val HISTORY_DIR = Value val HIVE_CLIENT_VERSION = Value val HIVE_METASTORE_VERSION = Value @@ -103,22 +128,31 @@ object LogKey extends Enumeration { val HOST_PORT = Value val INCOMPATIBLE_TYPES = Value val INDEX = Value + val INDEX_NAME = Value val INFERENCE_MODE = Value val INITIAL_CAPACITY = Value val INTERVAL = Value + val ISOLATION_LEVEL = Value val JOB_ID = Value val JOIN_CONDITION = Value - val JOIN_CONDITION_SUB_EXPRESSION = Value + val JOIN_CONDITION_SUB_EXPR = Value val KAFKA_PULLS_COUNT = Value val KAFKA_RECORDS_PULLED_COUNT = Value val KEY = Value val LAST_ACCESS_TIME = Value + val LATEST_BATCH_ID = Value + val LATEST_COMMITTED_BATCH_ID = Value val LEARNING_RATE = Value + val LEFT_EXPR = Value val LINE = Value val LINE_NUM = Value val LISTENER = Value val LOAD_FACTOR = Value + val LOAD_TIME = Value + val LOGICAL_PLAN_COLUMNS = Value + val LOGICAL_PLAN_LEAVES = Value val LOG_TYPE = Value + val LOWER_BOUND = Value val MASTER_URL = Value val MAX_ATTEMPTS = Value val MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value @@ -126,23 +160,33 @@ object LogKey extends Enumeration { val MAX_CAPACITY = Value val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value + val MAX_FILE_VERSION = Value + val MAX_PARTITIONS_SIZE = Value val MAX_SIZE = Value + val MAX_TABLE_PARTITION_METADATA_SIZE = Value val MERGE_DIR_NAME = Value val MESSAGE = Value val METHOD_NAME = Value val MIN_SIZE = Value + val NAMESPACE = Value + val NEW_PATH = Value val NEW_VALUE = Value val NUM_COLUMNS = Value + val NUM_FILES = Value val NUM_ITERATIONS = Value val OBJECT_ID = Value val OFFSET = Value val OFFSETS = Value val OLD_BLOCK_MANAGER_ID = Value val OLD_VALUE = Value + val OPTIMIZED_PLAN_COLUMNS = Value val OPTIMIZER_CLASS_NAME = Value + val OPTIONS = Value val OP_ID = Value val OP_TYPE = Value val PARSE_MODE = Value + val PARTITIONED_FILE_READER = Value + val PARTITIONS_SIZE = Value val PARTITION_ID = Value val PARTITION_SPECIFICATION = Value val PARTITION_SPECS = Value @@ -154,6 +198,7 @@ object LogKey extends Enumeration { val POD_PHASE = Value val POLICY = Value val PORT = Value + val PROCESSING_TIME = Value val PRODUCER_ID = Value val PROVIDER = Value val QUERY_CACHE_VALUE = Value @@ -163,8 +208,10 @@ object LogKey extends Enumeration { val QUERY_PLAN_COMPARISON = Value val QUERY_PLAN_LENGTH_ACTUAL = Value val QUERY_PLAN_LENGTH_MAX = Value + val QUERY_RUN_ID = Value val RANGE = Value val RDD_ID = Value + val READ_LIMIT = Value val REASON = Value val REATTACHABLE = Value val RECEIVED_BLOCK_INFO = Value @@ -174,6 +221,7 @@ object LogKey extends Enumeration { val RESOURCE_NAME = Value val RETRY_COUNT = Value val RETRY_INTERVAL = Value + val RIGHT_EXPR = Value val RULE_BATCH_NAME = Value val RULE_NAME = Value val RULE_NUMBER_OF_RUNS = Value @@ -190,20 +238,27 @@ object LogKey extends Enumeration { val SHUFFLE_MERGE_ID = Value val SIZE = Value val SLEEP_TIME = Value + val SPARK_DATA_STREAM = Value + val SPARK_PLAN_ID = Value val SQL_TEXT = Value val STAGE_ID = Value + val START_INDEX = Value val STATEMENT_ID = Value + val STATE_STORE_PROVIDER = Value val STATUS = Value + val STORE_ID = Value val STREAM_ID = Value val STREAM_NAME = Value val SUBMISSION_ID = Value val SUBSAMPLING_RATE = Value + val SUB_QUERY = Value val TABLE_NAME = Value val TASK_ATTEMPT_ID = Value val TASK_ID = Value val TASK_NAME = Value val TASK_SET_NAME = Value val TASK_STATE = Value + val TEMP_PATH = Value val THREAD = Value val THREAD_NAME = Value val TID = Value @@ -221,12 +276,16 @@ object LogKey extends Enumeration { val TOTAL_SIZE = Value val TOTAL_TIME = Value val TOTAL_TIME_READ = Value - val UNSUPPORTED_EXPRESSION = Value + val TREE_NODE = Value + val TRIGGER_INTERVAL = Value + val UNSUPPORTED_EXPR = Value val UNSUPPORTED_HINT_REASON = Value val UNTIL_OFFSET = Value + val UPPER_BOUND = Value val URI = Value val USER_ID = Value val USER_NAME = Value + val VALUE = Value val WAIT_RESULT_TIME = Value val WAIT_SEND_TIME = Value val WAIT_TIME = Value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala index e9c4dd0be7d9..ee285f7ca860 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/StreamingJoinHelper.scala @@ -168,7 +168,7 @@ object StreamingJoinHelper extends PredicateHelper with Logging { if (constraintTerms.size > 1) { logWarning( log"Failed to extract state constraint terms: multiple time terms in condition\n\t" + - log"${MDC(EXPRESSION_TERMS, terms.mkString("\n\t"))}") + log"${MDC(EXPR_TERMS, terms.mkString("\n\t"))}") return None } if (constraintTerms.isEmpty) { @@ -289,7 +289,7 @@ object StreamingJoinHelper extends PredicateHelper with Logging { logWarning( log"Failed to extract state value watermark from condition " + log"${MDC(JOIN_CONDITION, exprToCollectFrom)} due to " + - log"${MDC(JOIN_CONDITION_SUB_EXPRESSION, a)}") + log"${MDC(JOIN_CONDITION_SUB_EXPR, a)}") invalid = true Seq.empty } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala index 772382f5f1e1..ed3f0cc5a80c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.SparkIllegalArgumentException -import org.apache.spark.internal.LogKey.{SQL_TEXT, UNSUPPORTED_EXPRESSION} +import org.apache.spark.internal.LogKey.{SQL_TEXT, UNSUPPORTED_EXPR} import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet, LambdaFunction, Literal, MapFilter, Not, Or} import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} @@ -141,7 +141,7 @@ object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { "expr" -> e.sql)) } else { val message = log"Expected a Boolean type expression in replaceNullWithFalse, " + - log"but got the type `${MDC(UNSUPPORTED_EXPRESSION, e.dataType.catalogString)}` " + + log"but got the type `${MDC(UNSUPPORTED_EXPR, e.dataType.catalogString)}` " + log"in `${MDC(SQL_TEXT, e.sql)}`." logWarning(message) e diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 22c09c51c237..d9f32682ab69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql import scala.jdk.CollectionConverters._ import org.apache.spark.annotation.Stable -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LEFT_EXPR, RIGHT_EXPR} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} import org.apache.spark.sql.catalyst.expressions._ @@ -310,8 +311,9 @@ class Column(val expr: Expression) extends Logging { val right = lit(other).expr if (this.expr == right) { logWarning( - s"Constructing trivially true equals predicate, '${this.expr} = $right'. " + - "Perhaps you need to use aliases.") + log"Constructing trivially true equals predicate, " + + log"'${MDC(LEFT_EXPR, this.expr)} = ${MDC(RIGHT_EXPR, right)}'. " + + log"Perhaps you need to use aliases.") } fn("=", other) } @@ -516,8 +518,9 @@ class Column(val expr: Expression) extends Logging { val right = lit(other).expr if (this.expr == right) { logWarning( - s"Constructing trivially true equals predicate, '${this.expr} <=> $right'. " + - "Perhaps you need to use aliases.") + log"Constructing trivially true equals predicate, " + + log"'${MDC(LEFT_EXPR, this.expr)} <=> ${MDC(RIGHT_EXPR, right)}'. " + + log"Perhaps you need to use aliases.") } fn("<=>", other) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 15eeca87dcf6..b3a5751fce04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -29,7 +29,8 @@ import scala.util.control.NonFatal import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, SparkException, TaskContext} import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable} import org.apache.spark.api.java.JavaRDD -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CALL_SITE_LONG_FORM, CLASS_NAME} import org.apache.spark.internal.config.{ConfigEntry, EXECUTOR_ALLOW_SPARK_CONTEXT} import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} @@ -1358,13 +1359,13 @@ object SparkSession extends Logging { val session = getActiveSession.orElse(getDefaultSession) if (session.isDefined) { logWarning( - s"""An existing Spark session exists as the active or default session. - |This probably means another suite leaked it. Attempting to stop it before continuing. - |This existing Spark session was created at: - | - |${session.get.creationSite.longForm} - | - """.stripMargin) + log"""An existing Spark session exists as the active or default session. + |This probably means another suite leaked it. Attempting to stop it before continuing. + |This existing Spark session was created at: + | + |${MDC(CALL_SITE_LONG_FORM, session.get.creationSite.longForm)} + | + """.stripMargin) session.get.stop() SparkSession.clearActiveSession() SparkSession.clearDefaultSession() @@ -1391,7 +1392,8 @@ object SparkSession extends Logging { case e@(_: ClassCastException | _: ClassNotFoundException | _: NoClassDefFoundError) => - logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e) + logWarning(log"Cannot use ${MDC(CLASS_NAME, extensionConfClassName)} to configure " + + log"session extensions.", e) } } extensions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 62e6cc07b3e9..0464432d1f7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -26,7 +26,8 @@ import net.razorvine.pickle.{Pickler, Unpickler} import org.apache.spark.SparkException import org.apache.spark.api.python.DechunkedInputStream -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_LOADER import org.apache.spark.security.SocketAuthServer import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} @@ -136,8 +137,8 @@ private[sql] object PythonSQLUtils extends Logging { def addJarToCurrentClassLoader(path: String): Unit = { Utils.getContextOrSparkClassLoader match { case cl: MutableURLClassLoader => cl.addURL(Utils.resolveURI(path).toURL) - case cl => logWarning( - s"Unsupported class loader $cl will not update jars in the thread class loader.") + case cl => logWarning(log"Unsupported class loader ${MDC(CLASS_LOADER, cl)} will not " + + log"update jars in the thread class loader.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 97b701b7380d..655d4193f684 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -27,7 +27,8 @@ import org.apache.spark.TaskContext import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.r.SerDe import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CONFIG import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericRowWithSchema, Literal} @@ -58,9 +59,9 @@ private[sql] object SQLUtils extends Logging { SparkSession.builder().enableHiveSupport().sparkContext(jsc.sc).getOrCreate() } else { if (enableHiveSupport) { - logWarning("SparkR: enableHiveSupport is requested for SparkSession but " + - s"Spark is not built with Hive or ${CATALOG_IMPLEMENTATION.key} is not set to " + - "'hive', falling back to without Hive support.") + logWarning(log"SparkR: enableHiveSupport is requested for SparkSession but " + + log"Spark is not built with Hive or ${MDC(CONFIG, CATALOG_IMPLEMENTATION.key)} " + + log"is not set to 'hive', falling back to without Hive support.") } SparkSession.builder().sparkContext(jsc.sc).getOrCreate() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 36221d728066..1f141766f437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkException +import org.apache.spark.internal.LogKey.CONFIG +import org.apache.spark.internal.MDC import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec} @@ -524,9 +526,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) if (!createHiveTableByDefault || (ctas && conf.convertCTAS)) { (nonHiveStorageFormat, conf.defaultDataSourceName) } else { - logWarning("A Hive serde table will be created as there is no table provider " + - s"specified. You can set ${SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key} to false " + - "so that native data source table will be created instead.") + logWarning(log"A Hive serde table will be created as there is no table provider " + + log"specified. You can set " + + log"${MDC(CONFIG, SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key)} to false so that " + + log"native data source table will be created instead.") (defaultHiveStorage, DDLUtils.HIVE_PROVIDER) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 252a6290cbc7..2c77b7f0a05f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOGICAL_PLAN_COLUMNS, OPTIMIZED_PLAN_COLUMNS} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -226,10 +227,11 @@ object LogicalRDD extends Logging { (Some(rewrittenStatistics), Some(rewrittenConstraints)) }.getOrElse { // can't rewrite stats and constraints, give up - logWarning("The output columns are expected to the same (for name and type) for output " + - "between logical plan and optimized plan, but they aren't. output in logical plan: " + - s"${logicalPlan.output.map(_.simpleString(10))} / output in optimized plan: " + - s"${optimizedPlan.output.map(_.simpleString(10))}") + logWarning(log"The output columns are expected to the same (for name and type) for output " + + log"between logical plan and optimized plan, but they aren't. output in logical plan: " + + log"${MDC(LOGICAL_PLAN_COLUMNS, logicalPlan.output.map(_.simpleString(10)))} " + + log"/ output in optimized plan: " + + log"${MDC(OPTIMIZED_PLAN_COLUMNS, optimizedPlan.output.map(_.simpleString(10)))}") (None, None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 6280e7dd100c..00666190c0cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -26,7 +26,8 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXTENDED_EXPLAIN_GENERATOR import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} @@ -385,7 +386,8 @@ class QueryExecution( append(s"\n== Extended Information (${extension.title}) ==\n") append(extension.generateExtendedInfo(plan)) } catch { - case NonFatal(e) => logWarning(s"Cannot use $extension to get extended information.", e) + case NonFatal(e) => logWarning(log"Cannot use " + + log"${MDC(EXTENDED_EXPLAIN_GENERATOR, extension)} to get extended information.", e) }) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d7ebf786168b..c2ba38ac4a85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution import java.util.Locale import org.apache.spark.{SparkException, SparkUnsupportedOperationException} +import org.apache.spark.internal.LogKey.HASH_JOIN_KEYS +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, AnalysisException, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -212,10 +214,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if (!result) { val keysNotSupportingHashJoin = leftKeys.concat(rightKeys).filterNot( e => UnsafeRowUtils.isBinaryStable(e.dataType)) - logWarning("Hash based joins are not supported due to " + - "joining on keys that don't support binary equality. " + - "Keys not supporting hash joins: " + keysNotSupportingHashJoin - .map(e => e.toString + " due to DataType: " + e.dataType.typeName).mkString(", ")) + logWarning(log"Hash based joins are not supported due to joining on keys that don't " + + log"support binary equality. Keys not supporting hash joins: " + + log"${MDC(HASH_JOIN_KEYS, keysNotSupportingHashJoin.map( + e => e.toString + " due to DataType: " + e.dataType.typeName).mkString(", "))}") } result } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 5a0bf09a1713..32d6555aa9d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -24,6 +24,8 @@ import scala.collection.mutable import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkException, SparkUnsupportedOperationException} +import org.apache.spark.internal.LogKey.{CODEGEN_STAGE_ID, ERROR, TREE_NODE} +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -406,7 +408,7 @@ trait CodegenSupport extends SparkPlan { if (Utils.isTesting) { throw SparkException.internalError(errMsg) } else { - logWarning(s"[BUG] $errMsg Please open a JIRA ticket to report it.") + logWarning(log"[BUG] ${MDC(ERROR, errMsg)} Please open a JIRA ticket to report it.") } } if (parent.limitNotReachedChecks.isEmpty) { @@ -729,7 +731,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) } catch { case NonFatal(_) if !Utils.isTesting && conf.codegenFallback => // We should already saw the error message - logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString") + logWarning(log"Whole-stage codegen disabled for plan " + + log"(id=${MDC(CODEGEN_STAGE_ID, codegenStageId)}):\n " + + log"${MDC(TREE_NODE, treeString)}") return child.execute() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 50f2b7c81453..ae18de16c23e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable +import org.apache.spark.internal.LogKey.{CONFIG, SUB_QUERY} +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{DynamicPruningSubquery, ListQuery, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -67,8 +69,8 @@ case class InsertAdaptiveSparkPlan( AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery) } catch { case SubqueryAdaptiveNotSupportedException(subquery) => - logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + - s"but is not supported for sub-query: $subquery.") + logWarning(log"${MDC(CONFIG, SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)} is enabled " + + log"but is not supported for sub-query: ${MDC(SUB_QUERY, subquery)}.") plan } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala index c9b22a7d1b25..a41706b1f99e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command import scala.util.control.NonFatal +import org.apache.spark.internal.LogKey.{DATABASE_NAME, ERROR, TABLE_NAME} +import org.apache.spark.internal.MDC import org.apache.spark.sql.{Row, SparkSession} @@ -37,8 +39,8 @@ case class AnalyzeTablesCommand( CommandUtils.analyzeTable(sparkSession, tbl, noScan) } catch { case NonFatal(e) => - logWarning(s"Failed to analyze table ${tbl.table} in the " + - s"database $db because of ${e.toString}", e) + logWarning(log"Failed to analyze table ${MDC(TABLE_NAME, tbl.table)} in the " + + log"database ${MDC(DATABASE_NAME, db)} because of ${MDC(ERROR, e.toString)}}", e) } } Seq.empty[Row] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index eccf16ecea13..463f0ca23afc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -24,7 +24,8 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{DATABASE_NAME, ERROR, TABLE_NAME} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, UnresolvedAttribute} @@ -154,9 +155,9 @@ object CommandUtils extends Logging { getPathSize(fs, fs.getFileStatus(path)) } catch { case NonFatal(e) => - logWarning( - s"Failed to get the size of table ${identifier.table} in the " + - s"database ${identifier.database} because of ${e.toString}", e) + logWarning(log"Failed to get the size of table ${MDC(TABLE_NAME, identifier.table)} " + + log"in the database ${MDC(DATABASE_NAME, identifier.database)} because of " + + log"${MDC(ERROR, e.toString)}", e) 0L } }.getOrElse(0L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 672417f1adbf..6c57d9b2fe26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2, KEY, VALUE} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.parser.ParseException @@ -51,8 +52,9 @@ case class SetCommand(kv: Option[(String, Option[String])]) case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => val runFunc = (sparkSession: SparkSession) => { logWarning( - s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + - s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + log"Property ${MDC(CONFIG, SQLConf.Deprecated.MAPRED_REDUCE_TASKS)} is deprecated, " + + log"automatically converted to ${MDC(CONFIG2, SQLConf.SHUFFLE_PARTITIONS.key)} " + + log"instead.") if (value.toInt < 1) { val msg = s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " + @@ -68,8 +70,9 @@ case class SetCommand(kv: Option[(String, Option[String])]) case Some((SQLConf.Replaced.MAPREDUCE_JOB_REDUCES, Some(value))) => val runFunc = (sparkSession: SparkSession) => { logWarning( - s"Property ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} is Hadoop's property, " + - s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + log"Property ${MDC(CONFIG, SQLConf.Replaced.MAPREDUCE_JOB_REDUCES)} is Hadoop's " + + log"property, automatically converted to " + + log"${MDC(CONFIG2, SQLConf.SHUFFLE_PARTITIONS.key)} instead.") if (value.toInt < 1) { val msg = s"Setting negative ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} for automatically " + @@ -111,11 +114,12 @@ case class SetCommand(kv: Option[(String, Option[String])]) } if (sparkSession.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive") && key.startsWith("hive.")) { - logWarning(s"'SET $key=$value' might not work, since Spark doesn't support changing " + - "the Hive config dynamically. Please pass the Hive-specific config by adding the " + - s"prefix spark.hadoop (e.g. spark.hadoop.$key) when starting a Spark application. " + - "For details, see the link: https://spark.apache.org/docs/latest/configuration.html#" + - "dynamically-loading-spark-properties.") + logWarning(log"'SET ${MDC(KEY, key)}=${MDC(VALUE, value)}' might not work, since Spark " + + log"doesn't support changing the Hive config dynamically. Please pass the " + + log"Hive-specific config by adding the prefix spark.hadoop " + + log"(e.g. spark.hadoop.${MDC(KEY, key)}) when starting a Spark application. For " + + log"details, see the link: https://spark.apache.org/docs/latest/configuration.html#" + + log"dynamically-loading-spark-properties.") } sparkSession.conf.set(key, value) Seq(Row(key, value)) @@ -155,8 +159,8 @@ case class SetCommand(kv: Option[(String, Option[String])]) case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) => val runFunc = (sparkSession: SparkSession) => { logWarning( - s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + - s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + log"Property ${MDC(CONFIG, SQLConf.Deprecated.MAPRED_REDUCE_TASKS)} is deprecated, " + + log"showing ${MDC(CONFIG2, SQLConf.SHUFFLE_PARTITIONS.key)} instead.") Seq(Row( SQLConf.SHUFFLE_PARTITIONS.key, sparkSession.sessionState.conf.defaultNumShufflePartitions.toString)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a5e48784ada1..5a853a269848 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -28,7 +28,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ACTUAL_PARTITION_COLUMN, EXPECTED_PARTITION_COLUMN, PATH} import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -783,11 +784,12 @@ case class RepairTableCommand( partitionNames.drop(1), threshold, resolver, evalTaskSupport) } else { logWarning( - s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") + log"expected partition column ${MDC(EXPECTED_PARTITION_COLUMN, partitionNames.head)}," + + log" but got ${MDC(ACTUAL_PARTITION_COLUMN, ps(0))}, ignoring it") Seq.empty } } else { - logWarning(s"ignore ${new Path(path, name)}") + logWarning(log"ignore ${MDC(PATH, new Path(path, name))}") Seq.empty } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index 8a9fbd15e2e8..a34b235d2d12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -26,7 +26,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkContext, TaskContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ACTUAL_NUM_FILES, EXPECTED_NUM_FILES} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._ @@ -166,9 +167,9 @@ class BasicWriteTaskStatsTracker( } if (numSubmittedFiles != numFiles) { - logWarning(s"Expected $numSubmittedFiles files, but only saw $numFiles. " + - "This could be due to the output format not writing empty files, " + - "or files being not immediately visible in the filesystem.") + logWarning(log"Expected ${MDC(EXPECTED_NUM_FILES, numSubmittedFiles)} files, but only saw " + + log"${MDC(ACTUAL_NUM_FILES, numFiles)}. This could be due to the output format not " + + log"writing empty files, or files being not immediately visible in the filesystem.") } taskCommitTimeMetric.foreach(_ += taskCommitTime) BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 4c2d6a4cdf5e..84143a3899de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CLASS_NAME, DATA_SOURCE_PROVIDER, DATA_SOURCES, PATHS} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} @@ -695,8 +696,9 @@ object DataSource extends Logging { throw QueryCompilationErrors .foundMultipleXMLDataSourceError(provider1, sourceNames, externalSource.getName) } else if (internalSources.size == 1) { - logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " + - s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).") + logWarning(log"Multiple sources found for ${MDC(DATA_SOURCE_PROVIDER, provider1)} " + + log"(${MDC(DATA_SOURCES, sourceNames.mkString(", "))}), defaulting to the " + + log"internal datasource (${MDC(CLASS_NAME, internalSources.head.getClass.getName)}).") internalSources.head.getClass } else { throw QueryCompilationErrors.findMultipleDataSourceError(provider1, sourceNames) @@ -807,7 +809,7 @@ object DataSource extends Logging { } if (filteredIn.isEmpty) { logWarning( - s"All paths were ignored:\n ${filteredOut.mkString("\n ")}") + log"All paths were ignored:\n ${MDC(PATHS, filteredOut.mkString("\n "))}") } else { logDebug( s"Some paths were ignored:\n ${filteredOut.mkString("\n ")}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala index 2f4555effce3..2912cc965510 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala @@ -21,7 +21,8 @@ import java.util.Locale import java.util.concurrent.ConcurrentHashMap import org.apache.spark.api.python.PythonUtils -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.DATA_SOURCE import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSource import org.apache.spark.util.Utils @@ -53,7 +54,8 @@ class DataSourceManager extends Logging { } val previousValue = runtimeDataSourceBuilders.put(normalizedName, source) if (previousValue != null) { - logWarning(f"The data source $name replaced a previously registered data source.") + logWarning(log"The data source ${MDC(DATA_SOURCE, name)} replaced a previously " + + log"registered data source.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala index 836f0b069879..10c77607c311 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala @@ -21,7 +21,8 @@ import scala.collection.mutable.ArrayBuffer import scala.math.BigDecimal.RoundingMode import org.apache.spark.Partition -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, DESIRED_PARTITIONS_SIZE, MAX_PARTITIONS_SIZE, PARTITIONS_SIZE} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.ScanFileListing @@ -98,9 +99,11 @@ object FilePartition extends Logging { val desiredSplitBytes = (totalSizeInBytes / BigDecimal(maxPartNum.get)).setScale(0, RoundingMode.UP).longValue val desiredPartitions = getFilePartitions(partitionedFiles, desiredSplitBytes, openCostBytes) - logWarning(s"The number of partitions is ${partitions.size}, which exceeds the maximum " + - s"number configured: ${maxPartNum.get}. Spark rescales it to ${desiredPartitions.size} " + - s"by ignoring the configuration of ${SQLConf.FILES_MAX_PARTITION_BYTES.key}.") + logWarning(log"The number of partitions is ${MDC(PARTITIONS_SIZE, partitions.size)}, " + + log"which exceeds the maximum number configured: " + + log"${MDC(MAX_PARTITIONS_SIZE, maxPartNum.get)}. Spark rescales it to " + + log"${MDC(DESIRED_PARTITIONS_SIZE, desiredPartitions.size)} by ignoring the " + + log"configuration of ${MDC(CONFIG, SQLConf.FILES_MAX_PARTITION_BYTES.key)}.") desiredPartitions } else { partitions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 825b8154f681..28700fa4f37f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -24,6 +24,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{Partition => RDDPartition, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.LogKey.PATH +import org.apache.spark.internal.MDC import org.apache.spark.paths.SparkPath import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.SparkSession @@ -259,14 +261,14 @@ class FileScanRDD( } } catch { case e: FileNotFoundException if ignoreMissingFiles => - logWarning(s"Skipped missing file: $currentFile", e) + logWarning(log"Skipped missing file: ${MDC(PATH, currentFile)}", e) finished = true null // Throw FileNotFoundException even if `ignoreCorruptFiles` is true case e: FileNotFoundException if !ignoreMissingFiles => throw e case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => - logWarning( - s"Skipped the rest of the content in the corrupted file: $currentFile", e) + logWarning(log"Skipped the rest of the content in the corrupted file: " + + log"${MDC(PATH, currentFile)}", e) finished = true null } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index 80002ecdaf8d..29706f9cd7f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import com.google.common.cache._ import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CACHED_TABLE_PARTITION_METADATA_SIZE, MAX_TABLE_PARTITION_METADATA_SIZE} import org.apache.spark.sql.SparkSession import org.apache.spark.util.SizeEstimator @@ -111,8 +112,8 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = { val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale if (estimate > Int.MaxValue) { - logWarning(s"Cached table partition metadata size is too big. Approximating to " + - s"${Int.MaxValue.toLong * weightScale}.") + logWarning(log"Cached table partition metadata size is too big. Approximating to " + + log"${MDC(CACHED_TABLE_PARTITION_METADATA_SIZE, Int.MaxValue.toLong * weightScale)}.") Int.MaxValue } else { estimate.toInt @@ -126,9 +127,10 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends if (removed.getCause == RemovalCause.SIZE && warnedAboutEviction.compareAndSet(false, true)) { logWarning( - "Evicting cached table partition metadata from memory due to size constraints " + - "(spark.sql.hive.filesourcePartitionFileCacheSize = " - + maxSizeInBytes + " bytes). This may impact query planning performance.") + log"Evicting cached table partition metadata from memory due to size constraints " + + log"(spark.sql.hive.filesourcePartitionFileCacheSize = " + + log"${MDC(MAX_TABLE_PARTITION_METADATA_SIZE, maxSizeInBytes)} bytes). " + + log"This may impact query planning performance.") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index cf7c536bdaec..52b5810362ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -28,7 +28,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.TaskContext import org.apache.spark.input.{PortableDataStream, StreamInputFormat} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -202,12 +203,12 @@ object MultiLineCSVDataSource extends CSVDataSource with Logging { encoding = parsedOptions.charset) } catch { case e: FileNotFoundException if ignoreMissingFiles => - logWarning(s"Skipped missing file: ${lines.getPath()}", e) + logWarning(log"Skipped missing file: ${MDC(PATH, lines.getPath())}", e) Array.empty[Array[String]] case e: FileNotFoundException if !ignoreMissingFiles => throw e case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => - logWarning( - s"Skipped the rest of the content in the corrupted file: ${lines.getPath()}", e) + logWarning(log"Skipped the rest of the content in the corrupted file: " + + log"${MDC(PATH, lines.getPath())}", e) Array.empty[Array[String]] } }.take(1).headOption match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 4f19d3df40b3..ca2d59a7672f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -21,7 +21,8 @@ import scala.collection.mutable.ArrayBuffer import scala.math.BigDecimal.RoundingMode import org.apache.spark.Partition -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOWER_BOUND, NEW_VALUE, OLD_VALUE, UPPER_BOUND} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ @@ -114,12 +115,12 @@ private[sql] object JDBCRelation extends Logging { (upperBound - lowerBound) < 0) { partitioning.numPartitions } else { - logWarning("The number of partitions is reduced because the specified number of " + - "partitions is less than the difference between upper bound and lower bound. " + - s"Updated number of partitions: ${upperBound - lowerBound}; Input number of " + - s"partitions: ${partitioning.numPartitions}; " + - s"Lower bound: ${boundValueToString(lowerBound)}; " + - s"Upper bound: ${boundValueToString(upperBound)}.") + logWarning(log"The number of partitions is reduced because the specified number of " + + log"partitions is less than the difference between upper bound and lower bound. " + + log"Updated number of partitions: ${MDC(NEW_VALUE, upperBound - lowerBound)}; " + + log"Input number of partitions: ${MDC(OLD_VALUE, partitioning.numPartitions)}; " + + log"Lower bound: ${MDC(LOWER_BOUND, boundValueToString(lowerBound))}; " + + log"Upper bound: ${MDC(UPPER_BOUND, boundValueToString(upperBound))}.") upperBound - lowerBound } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 53b0b8b5d29d..fd7be9d0ea41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -32,7 +32,8 @@ import scala.util.control.NonFatal import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.executor.InputMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{DEFAULT_ISOLATION_LEVEL, ISOLATION_LEVEL} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.{DecimalPrecision, Resolver} @@ -771,11 +772,13 @@ object JdbcUtils extends Logging with SQLConfHelper { // Finally update to actually requested level if possible finalIsolationLevel = isolationLevel } else { - logWarning(s"Requested isolation level $isolationLevel is not supported; " + - s"falling back to default isolation level $defaultIsolation") + logWarning(log"Requested isolation level ${MDC(ISOLATION_LEVEL, isolationLevel)} " + + log"is not supported; falling back to default isolation level " + + log"${MDC(DEFAULT_ISOLATION_LEVEL, defaultIsolation)}") } } else { - logWarning(s"Requested isolation level $isolationLevel, but transactions are unsupported") + logWarning(log"Requested isolation level ${MDC(ISOLATION_LEVEL, isolationLevel)}, " + + log"but transactions are unsupported") } } catch { case NonFatal(e) => logWarning("Exception while detecting transaction support", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala index 55602ce2ed9b..34046b075395 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala @@ -21,7 +21,8 @@ import java.nio.charset.{Charset, StandardCharsets} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ENCODING, PATH} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions, JSONOptionsInRead} import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter} @@ -40,8 +41,9 @@ class JsonOutputWriter( } if (JSONOptionsInRead.denyList.contains(encoding)) { - logWarning(s"The JSON file ($path) was written in the encoding ${encoding.displayName()}" + - " which can be read back by Spark only if multiLine is enabled.") + logWarning(log"The JSON file (${MDC(PATH, path)}) was written in the encoding " + + log"${MDC(ENCODING, encoding.displayName())} which can be read back by Spark only " + + log"if multiLine is enabled.") } private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 24943b37d059..4d1354d15aba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -31,7 +31,8 @@ import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStat import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution @@ -87,7 +88,7 @@ object OrcUtils extends Logging { } catch { case e: org.apache.orc.FileFormatException => if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $file", e) + logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, file)}", e) None } else { throw QueryExecutionErrors.cannotReadFooterForFileError(file, e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index df367766501d..3fdbc38bba7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -32,7 +32,8 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GRO import org.apache.parquet.hadoop._ import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{PATH, SCHEMA} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -409,8 +410,8 @@ object ParquetFileFormat extends Logging { } .recover { case cause: Throwable => logWarning( - s"""Failed to parse serialized Spark schema in Parquet key-value metadata: - |\t$serializedSchema + log"""Failed to parse serialized Spark schema in Parquet key-value metadata: + |\t${MDC(SCHEMA, serializedSchema)} """.stripMargin, cause) } @@ -450,7 +451,7 @@ object ParquetFileFormat extends Logging { conf, currentFile, SKIP_ROW_GROUPS))) } catch { case e: RuntimeException => if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) + logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, currentFile)}", e) None } else { throw QueryExecutionErrors.cannotReadFooterForFileError(currentFile.getPath, e) @@ -526,8 +527,8 @@ object ParquetFileFormat extends Logging { }.recoverWith { case cause: Throwable => logWarning( - "Failed to parse and ignored serialized Spark schema in " + - s"Parquet key-value metadata:\n\t$schemaString", cause) + log"Failed to parse and ignored serialized Spark schema in " + + log"Parquet key-value metadata:\n\t${MDC(SCHEMA, schemaString)}", cause) Failure(cause) }.toOption } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 5020bf7333de..1c1c46542b61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -33,7 +33,8 @@ import org.apache.parquet.schema.{PrimitiveType, Types} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.{SparkException, SparkUnsupportedOperationException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CLASS_NAME, CONFIG} import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -485,9 +486,9 @@ object ParquetUtils extends Logging { if (ParquetOutputFormat.getJobSummaryLevel(conf) != JobSummaryLevel.NONE && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { // output summary is requested, but the class is not a Parquet Committer - logWarning(s"Committer $committerClass is not a ParquetOutputCommitter and cannot" + - s" create job summaries. " + - s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.") + logWarning(log"Committer ${MDC(CLASS_NAME, committerClass)} is not a " + + log"ParquetOutputCommitter and cannot create job summaries. Set Parquet option " + + log"${MDC(CONFIG, ParquetOutputFormat.JOB_SUMMARY_LEVEL)} to NONE.") } new OutputWriterFactory { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 28241fb0a67a..925246320c66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.Locale +import org.apache.spark.internal.LogKey.OPTIONS +import org.apache.spark.internal.MDC import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.LocalTempView @@ -44,7 +46,7 @@ trait BaseCacheTableExec extends LeafV2CommandExec { val withoutStorageLevel = options .filter { case (k, _) => k.toLowerCase(Locale.ROOT) != storageLevelKey } if (withoutStorageLevel.nonEmpty) { - logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") + logWarning(log"Invalid options: ${MDC(OPTIONS, withoutStorageLevel.mkString(", "))}") } session.sharedState.cacheManager.cacheQuery( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala index 63c8dc6517b9..df922af9ca3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala @@ -21,6 +21,8 @@ import java.util import scala.jdk.CollectionConverters._ +import org.apache.spark.internal.LogKey.{INDEX_NAME, TABLE_NAME} +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -55,7 +57,8 @@ case class CreateIndexExec( indexName, columns.map(_._1).toArray, colProperties, propertiesWithIndexType.asJava) } catch { case _: IndexAlreadyExistsException if ignoreIfExists => - logWarning(s"Index $indexName already exists in table ${table.name}. Ignoring.") + logWarning(log"Index ${MDC(INDEX_NAME, indexName)} already exists in " + + log"table ${MDC(TABLE_NAME, table.name)}. Ignoring.") } Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala index cb51b7f75f33..65fec4b243d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.jdk.CollectionConverters.MapHasAsJava +import org.apache.spark.internal.LogKey.NAMESPACE +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -47,7 +49,8 @@ case class CreateNamespaceExec( catalog.createNamespace(ns, (properties ++ ownership).asJava) } catch { case _: NamespaceAlreadyExistsException if ifNotExists => - logWarning(s"Namespace ${namespace.quoted} was created concurrently. Ignoring.") + logWarning(log"Namespace ${MDC(NAMESPACE, namespace.quoted)} was created concurrently. " + + log"Ignoring.") } } else if (!ifNotExists) { throw QueryCompilationErrors.namespaceAlreadyExistsError(ns) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala index 5f3ed7a5bc76..b981fa3a2a05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.jdk.CollectionConverters._ +import org.apache.spark.internal.LogKey.TABLE_NAME +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -44,7 +46,8 @@ case class CreateTableExec( catalog.createTable(identifier, columns, partitioning.toArray, tableProperties.asJava) } catch { case _: TableAlreadyExistsException if ignoreIfExists => - logWarning(s"Table ${identifier.quoted} was created concurrently. Ignoring.") + logWarning( + log"Table ${MDC(TABLE_NAME, identifier.quoted)} was created concurrently. Ignoring.") } } else if (!ignoreIfExists) { throw QueryCompilationErrors.tableAlreadyExistsError(identifier) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 828d737f93fa..a8eecb699ce9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -22,7 +22,8 @@ import scala.collection.mutable import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXPR import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} import org.apache.spark.sql.catalyst.catalog.CatalogUtils @@ -650,7 +651,7 @@ private[sql] object DataSourceV2Strategy extends Logging { Some(new Predicate("IN", FieldReference(name) +: literals)) case other => - logWarning(s"Can't translate $other to source filter, unsupported expression") + logWarning(log"Can't translate ${MDC(EXPR, other)} to source filter, unsupported expression") None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIndexExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIndexExec.scala index 085f96119377..a77d39fb696b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIndexExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropIndexExec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.spark.internal.LogKey.INDEX_NAME +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchIndexException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -34,7 +36,7 @@ case class DropIndexExec( table.dropIndex(indexName) } catch { case _: NoSuchIndexException if ignoreIfNotExists => - logWarning(s"Index $indexName does not exist. Ignoring.") + logWarning(log"Index ${MDC(INDEX_NAME, indexName)} does not exist. Ignoring.") } Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala index c7783c4e9b29..1863b4b445d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.datasources.v2 import java.io.{FileNotFoundException, IOException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PARTITIONED_FILE_READER import org.apache.spark.rdd.InputFileBlockHolder import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.connector.read.PartitionReader @@ -64,8 +65,8 @@ class FilePartitionReader[T]( currentReader != null && currentReader.next() } catch { case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => - logWarning( - s"Skipped the rest of the content in the corrupted file: $currentReader", e) + logWarning(log"Skipped the rest of the content in the corrupted file: " + + log"${MDC(PARTITIONED_FILE_READER, currentReader)}", e) false case e: Throwable => throw FileDataSourceV2.attachFilePath(currentReader.file.urlEncodedPath, e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 61d61ee7af25..4816e3164d4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -21,7 +21,8 @@ import java.util.{Locale, OptionalLong} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{PATH, REASON} import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ExpressionSet} @@ -164,8 +165,8 @@ trait FileScan extends Scan val path = splitFiles(0).toPath if (!isSplitable(path) && splitFiles(0).length > sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) { - logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + - s"partition, the reason is: ${getFileUnSplittableReason(path)}") + logWarning(log"Loading one large unsplittable file ${MDC(PATH, path.toString)} with only " + + log"one partition, the reason is: ${MDC(REASON, getFileUnSplittableReason(path))}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala index cb7c3efdbe48..b6eda80e7ef8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioningAndOrdering.scala @@ -16,7 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -58,8 +59,9 @@ object V2ScanPartitioningAndOrdering extends Rule[LogicalPlan] with SQLConfHelpe } case _: UnknownPartitioning => None case p => - logWarning(s"Spark ignores the partitioning ${p.getClass.getSimpleName}." + - " Please use KeyGroupedPartitioning for better performance") + logWarning( + log"Spark ignores the partitioning ${MDC(CLASS_NAME, p.getClass.getSimpleName)}. " + + log"Please use KeyGroupedPartitioning for better performance") None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala index 8eeb919d0baf..5b65c5fe7519 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala @@ -27,6 +27,8 @@ import org.json4s._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.api.python._ +import org.apache.spark.internal.LogKey.CONFIG +import org.apache.spark.internal.MDC import org.apache.spark.sql.Row import org.apache.spark.sql.api.python.PythonSQLUtils import org.apache.spark.sql.catalyst.InternalRow @@ -88,9 +90,9 @@ class ApplyInPandasWithStatePythonRunner( override val bufferSize: Int = { val configuredSize = sqlConf.pandasUDFBufferSize if (configuredSize < 4) { - logWarning("Pandas execution requires more than 4 bytes. Please configure bigger value " + - s"for the configuration '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'. " + - "Force using the value '4'.") + logWarning(log"Pandas execution requires more than 4 bytes. Please configure bigger value " + + log"for the configuration '${MDC(CONFIG, SQLConf.PANDAS_UDF_BUFFER_SIZE.key)}'. " + + log"Force using the value '4'.") 4 } else { configuredSize diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala index e353bf5a51e9..62a64bbb3289 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.python +import org.apache.spark.internal.LogKey.{RDD_ID, SPARK_PLAN_ID} +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -108,7 +110,8 @@ case class AttachDistributedSequenceExec( override protected[sql] def cleanupResources(): Unit = { try { if (cached != null && cached.getStorageLevel != StorageLevel.NONE) { - logWarning(s"clean up cached RDD(${cached.id}) in AttachDistributedSequenceExec($id)") + logWarning(log"clean up cached RDD(${MDC(RDD_ID, cached.id)}) in " + + log"AttachDistributedSequenceExec(${MDC(SPARK_PLAN_ID, id)})") cached.unpersist(blocking = false) } } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala index 18dd2eba083a..37f75742214c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{DELEGATE, READ_LIMIT} import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ReadLimit, SparkDataStream, SupportsAdmissionControl, SupportsTriggerAvailableNow} import org.apache.spark.sql.connector.read.streaming @@ -29,10 +30,10 @@ class AvailableNowDataStreamWrapper(val delegate: SparkDataStream) extends SparkDataStream with SupportsTriggerAvailableNow with Logging { // See SPARK-45178 for more details. - logWarning("Activating the wrapper implementation of Trigger.AvailableNow for source " + - s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " + - "correctness issue. Enable the config with extreme care. We strongly recommend to contact " + - "the data source developer to support Trigger.AvailableNow.") + logWarning(log"Activating the wrapper implementation of Trigger.AvailableNow for source " + + log"[${MDC(DELEGATE, delegate)}]. Note that this might introduce possibility of " + + log"deduplication, dataloss, correctness issue. Enable the config with extreme care. We " + + log"strongly recommend to contact the data source developer to support Trigger.AvailableNow.") private var fetchedOffset: streaming.Offset = _ @@ -71,8 +72,8 @@ class AvailableNowDataStreamWrapper(val delegate: SparkDataStream) case s: SupportsAdmissionControl => val limit = s.getDefaultReadLimit if (limit != ReadLimit.allAvailable()) { - logWarning(s"The read limit $limit is ignored because source $delegate does not " + - "support running Trigger.AvailableNow queries.") + logWarning(log"The read limit ${MDC(READ_LIMIT, limit)} is ignored because source " + + log"${MDC(DELEGATE, delegate)} does not support running Trigger.AvailableNow queries.") } ReadLimit.allAvailable() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 34c5dee0997b..1c530fcabe0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -26,7 +26,8 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} import org.apache.hadoop.fs.permission.FsPermission -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{PATH, TEMP_PATH} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods import org.apache.spark.sql.internal.SQLConf @@ -154,8 +155,8 @@ object CheckpointFileManager extends Logging { fm.renameTempFile(tempPath, finalPath, overwriteIfPossible) } catch { case fe: FileAlreadyExistsException => - logWarning( - s"Failed to rename temp file $tempPath to $finalPath because file exists", fe) + logWarning(log"Failed to rename temp file ${MDC(TEMP_PATH, tempPath)} to " + + log"${MDC(PATH, finalPath)} because file exists", fe) if (!overwriteIfPossible) throw fe } @@ -178,13 +179,13 @@ object CheckpointFileManager extends Logging { underlyingStream.close() } catch { case NonFatal(e) => - logWarning(s"Error cancelling write to $finalPath, " + - s"continuing to delete temp path $tempPath", e) + logWarning(log"Error cancelling write to ${MDC(PATH, finalPath)}, continuing to " + + log"delete temp path ${MDC(TEMP_PATH, tempPath)}", e) } fm.delete(tempPath) } catch { case NonFatal(e) => - logWarning(s"Error deleting temp file $tempPath", e) + logWarning(log"Error deleting temp file ${MDC(TEMP_PATH, tempPath)}", e) } finally { terminated = true } @@ -210,10 +211,10 @@ object CheckpointFileManager extends Logging { } catch { case e: UnsupportedFileSystemException => logWarning( - "Could not use FileContext API for managing Structured Streaming checkpoint files at " + - s"$path. Using FileSystem API instead for managing log files. If the implementation " + - s"of FileSystem.rename() is not atomic, then the correctness and fault-tolerance of" + - s"your Structured Streaming is not guaranteed.") + log"Could not use FileContext API for managing Structured Streaming checkpoint files " + + log"at ${MDC(PATH, path)}. Using FileSystem API instead for managing log files. If " + + log"the implementation of FileSystem.rename() is not atomic, then the correctness " + + log"and fault-tolerance of your Structured Streaming is not guaranteed.") new FileSystemBasedCheckpointFileManager(path, hadoopConf) } } @@ -274,7 +275,8 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration throw QueryExecutionErrors.renameSrcPathNotFoundError(srcPath) } else { val e = QueryExecutionErrors.failedRenameTempFileError(srcPath, dstPath) - logWarning(e.getMessage) + logWarning(log"Failed to rename temp file ${MDC(TEMP_PATH, srcPath)} to " + + log"${MDC(PATH, dstPath)} as FileSystem.rename returned false.") throw e } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 8d38bba1f2a6..884de070bf32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -27,6 +27,8 @@ import org.apache.hadoop.fs.Path import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization +import org.apache.spark.internal.LogKey.{BATCH_ID, ELAPSED_TIME} +import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.util.Utils @@ -240,7 +242,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( } if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) { - logWarning(s"Compacting took $elapsedMs ms for compact batch $batchId") + logWarning(log"Compacting took ${MDC(ELAPSED_TIME, elapsedMs)} ms for compact batch " + + log"${MDC(BATCH_ID, batchId)}") } else { logDebug(s"Compacting took $elapsedMs ms for compact batch $batchId") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index ea8db3c99de9..18895e525156 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -23,7 +23,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, PATH} import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ @@ -60,8 +61,8 @@ object FileStreamSink extends Logging { } catch { case e: SparkException => throw e case NonFatal(e) => - logWarning(s"Assume no metadata directory. Error while looking for " + - s"metadata directory in the path: $singlePath.", e) + logWarning(log"Assume no metadata directory. Error while looking for " + + log"metadata directory in the path: ${MDC(PATH, singlePath)}.", e) false } case _ => false @@ -84,7 +85,7 @@ object FileStreamSink extends Logging { } catch { case NonFatal(e) => // We may not have access to this directory. Don't fail the query if that happens. - logWarning(e.getMessage, e) + logWarning(log"${MDC(ERROR, e.getMessage)}", e) false } if (legacyMetadataPathExists) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index eacbd0447d16..6aa8e94db94f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CURRENT_PATH, ELAPSED_TIME, NEW_PATH, NUM_FILES} import org.apache.spark.paths.SparkPath import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -380,7 +381,8 @@ class FileStreamSource( val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime) if (listingTimeMs > 2000) { // Output a warning when listing files uses more than 2 seconds. - logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms") + logWarning(log"Listed ${MDC(NUM_FILES, files.size)} file(s) in " + + log"${MDC(ELAPSED_TIME, listingTimeMs)} ms") } else { logTrace(s"Listed ${files.size} file(s) in $listingTimeMs ms") } @@ -628,11 +630,13 @@ object FileStreamSource { logDebug(s"Archiving completed file $curPath to $newPath") if (!fileSystem.rename(curPath, newPath)) { - logWarning(s"Fail to move $curPath to $newPath / skip moving file.") + logWarning(log"Fail to move ${MDC(CURRENT_PATH, curPath)} to " + + log"${MDC(NEW_PATH, newPath)} / skip moving file.") } } catch { case NonFatal(e) => - logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + logWarning(log"Fail to move ${MDC(CURRENT_PATH, curPath)} to " + + log"${MDC(NEW_PATH, newPath)} / skip moving file.", e) } } } @@ -646,12 +650,12 @@ object FileStreamSource { logDebug(s"Removing completed file $curPath") if (!fileSystem.delete(curPath, false)) { - logWarning(s"Failed to remove $curPath / skip removing file.") + logWarning(log"Failed to remove ${MDC(CURRENT_PATH, curPath)} / skip removing file.") } } catch { case NonFatal(e) => // Log to error but swallow exception to avoid process being stopped - logWarning(s"Fail to remove $curPath / skip removing file.", e) + logWarning(log"Fail to remove ${MDC(CURRENT_PATH, curPath)} / skip removing file.", e) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index cfccfff3a138..62999d711ce8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.fs.Path -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.ERROR import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, ExpressionWithRandomSeed} @@ -419,9 +420,9 @@ class IncrementalExecution( } catch { case e: Exception => // no need to throw fatal error, returns empty map - logWarning("Error reading metadata path for stateful operator. " + - s"This may due to no prior committed batch, or previously run on lower versions:" + - s" ${e.getMessage}") + logWarning(log"Error reading metadata path for stateful operator. This may due to " + + log"no prior committed batch, or previously run on lower versions: " + + log"${MDC(ERROR, e.getMessage)}") } } ret diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 46ce33687890..f444c46f1419 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -25,7 +25,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.errors.QueryExecutionErrors @@ -95,7 +96,8 @@ class ManifestFileCommitProtocol(jobId: String, path: String) } } catch { case e: IOException => - logWarning(s"Fail to remove temporary file $path, continue removing next.", e) + logWarning(log"Fail to remove temporary file ${MDC(PATH, path)}, " + + log"continue removing next.", e) } } pendingCommitFiles.clear() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index ae5a033538ab..e5d275d7c242 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming import scala.collection.mutable.{Map => MutableMap} import scala.collection.mutable +import org.apache.spark.internal.LogKey.{LATEST_BATCH_ID, LATEST_COMMITTED_BATCH_ID, READ_LIMIT, SPARK_DATA_STREAM} +import org.apache.spark.internal.MDC import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp} @@ -108,10 +110,10 @@ class MicroBatchExecution( val supportsTriggerAvailableNow = sources.distinct.forall { src => val supports = src.isInstanceOf[SupportsTriggerAvailableNow] if (!supports) { - logWarning(s"source [$src] does not support Trigger.AvailableNow. Falling back to " + - "single batch execution. Note that this may not guarantee processing new data if " + - "there is an uncommitted batch. Please consult with data source developer to " + - "support Trigger.AvailableNow.") + logWarning(log"source [${MDC(SPARK_DATA_STREAM, src)}] does not support " + + log"Trigger.AvailableNow. Falling back to single batch execution. Note that this " + + log"may not guarantee processing new data if there is an uncommitted batch. " + + log"Please consult with data source developer to support Trigger.AvailableNow.") } supports @@ -211,8 +213,8 @@ class MicroBatchExecution( case s: SupportsAdmissionControl => val limit = s.getDefaultReadLimit if (limit != ReadLimit.allAvailable()) { - logWarning( - s"The read limit $limit for $s is ignored when Trigger.Once is used.") + logWarning(log"The read limit ${MDC(READ_LIMIT, limit)} for " + + log"${MDC(SPARK_DATA_STREAM, s)} is ignored when Trigger.Once is used.") } s -> ReadLimit.allAvailable() case s => @@ -510,9 +512,9 @@ class MicroBatchExecution( // here, so we do nothing here. } } else if (latestCommittedBatchId < latestBatchId - 1) { - logWarning(s"Batch completion log latest batch id is " + - s"${latestCommittedBatchId}, which is not trailing " + - s"batchid $latestBatchId by one") + logWarning(log"Batch completion log latest batch id is " + + log"${MDC(LATEST_COMMITTED_BATCH_ID, latestCommittedBatchId)}, which is not " + + log"trailing batchid ${MDC(LATEST_BATCH_ID, latestBatchId)} by one") } case None => logInfo("no commit log present") } @@ -748,8 +750,8 @@ class MicroBatchExecution( } } else if (catalogTable.exists(_ ne newRelation.catalogTable.get)) { // Output a warning if `catalogTable` is provided by the source rather than engine - logWarning( - s"Source $source should not produce the information of catalog table by its own.") + logWarning(log"Source ${MDC(SPARK_DATA_STREAM, source)} should not produce the " + + log"information of catalog table by its own.") } newRelation } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 006d6221e55a..9193b7bd57f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.streaming import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, DEFAULT_VALUE, NEW_VALUE, OLD_VALUE, TIP} import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkDataStream} @@ -143,8 +144,9 @@ object OffsetSeqMetadata extends Logging { // Config value exists in the metadata, update the session config with this value val optionalValueInSession = sessionConf.getOption(confKey) if (optionalValueInSession.isDefined && optionalValueInSession.get != valueInMetadata) { - logWarning(s"Updating the value of conf '$confKey' in current session from " + - s"'${optionalValueInSession.get}' to '$valueInMetadata'.") + logWarning(log"Updating the value of conf '${MDC(CONFIG, confKey)}' in current " + + log"session from '${MDC(OLD_VALUE, optionalValueInSession.get)}' " + + log"to '${MDC(NEW_VALUE, valueInMetadata)}'.") } sessionConf.set(confKey, valueInMetadata) @@ -156,14 +158,15 @@ object OffsetSeqMetadata extends Logging { case Some(defaultValue) => sessionConf.set(confKey, defaultValue) - logWarning(s"Conf '$confKey' was not found in the offset log, " + - s"using default value '$defaultValue'") + logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in the offset log, " + + log"using default value '${MDC(DEFAULT_VALUE, defaultValue)}'") case None => val valueStr = sessionConf.getOption(confKey).map { v => s" Using existing session conf value '$v'." }.getOrElse { " No value set in session conf." } - logWarning(s"Conf '$confKey' was not found in the offset log. $valueStr") + logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in the offset log. " + + log"${MDC(TIP, valueStr)}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 0d32eed9b6bd..138e4abefce2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -25,7 +25,8 @@ import java.util.{Optional, UUID} import scala.collection.mutable import scala.jdk.CollectionConverters._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{EXECUTION_PLAN_LEAVES, FINISH_TRIGGER_DURATION, LOGICAL_PLAN_LEAVES, PROCESSING_TIME} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.optimizer.InlineCTE import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan, WithCTE} @@ -382,9 +383,10 @@ abstract class ProgressContext( val finishTriggerDurationMillis = triggerClock.getTimeMillis() - triggerEndTimestamp val thresholdForLoggingMillis = 60 * 1000 if (finishTriggerDurationMillis > math.max(thresholdForLoggingMillis, processingTimeMills)) { - logWarning("Query progress update takes longer than batch processing time. Progress " + - s"update takes $finishTriggerDurationMillis milliseconds. Batch processing takes " + - s"$processingTimeMills milliseconds") + logWarning(log"Query progress update takes longer than batch processing time. Progress " + + log"update takes ${MDC(FINISH_TRIGGER_DURATION, finishTriggerDurationMillis)} " + + log"milliseconds. Batch processing takes ${MDC(PROCESSING_TIME, processingTimeMills)} " + + log"milliseconds") } } @@ -485,11 +487,10 @@ abstract class ProgressContext( if (!metricWarningLogged) { def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}" - logWarning( - "Could not report metrics as number leaves in trigger logical plan did not match that" + - s" of the execution plan:\n" + - s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" + - s"execution plan leaves: ${toString(allExecPlanLeaves)}\n") + logWarning(log"Could not report metrics as number leaves in trigger logical plan did " + + log"not match that of the execution plan:\nlogical plan leaves: " + + log"${MDC(LOGICAL_PLAN_LEAVES, toString(allLogicalPlanLeaves))}\nexecution plan " + + log"leaves: ${MDC(EXECUTION_PLAN_LEAVES, toString(allExecPlanLeaves))}\n") metricWarningLogged = true } Map.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala index 35bb7db6a6e1..e80860b82c32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala @@ -23,6 +23,8 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path +import org.apache.spark.internal.LogKey.{CONFIG, PATH} +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -42,8 +44,8 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { val (resolvedCheckpointLocation, deleteCheckpointOnStop) = resolveCheckpointLocation(s) if (conf.adaptiveExecutionEnabled) { - logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + - "is not supported in streaming DataFrames/Datasets and will be disabled.") + logWarning(log"${MDC(CONFIG, SQLConf.ADAPTIVE_EXECUTION_ENABLED.key)} " + + log"is not supported in streaming DataFrames/Datasets and will be disabled.") } if (conf.isUnsupportedOperationCheckEnabled) { @@ -77,10 +79,11 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { if (s.useTempCheckpointLocation) { deleteCheckpointOnStop = true val tempDir = Utils.createTempDir(namePrefix = "temporary").getCanonicalPath - logWarning("Temporary checkpoint location created which is deleted normally when" + - s" the query didn't fail: $tempDir. If it's required to delete it under any" + - s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" + - s" true. Important to know deleting temp checkpoint folder is best effort.") + logWarning(log"Temporary checkpoint location created which is deleted normally when" + + log" the query didn't fail: ${MDC(PATH, tempDir)}. If it's required to delete " + + log"it under any circumstances, please set " + + log"${MDC(CONFIG, SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key)} to" + + log" true. Important to know deleting temp checkpoint folder is best effort.") // SPARK-42676 - Write temp checkpoints for streaming queries to local filesystem // even if default FS is set differently. // This is a band-aid fix. Ideally we should convert `tempDir` to URIs, but there diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 50a73082a8c4..ce2bbba550bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -32,7 +32,8 @@ import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.hadoop.fs.Path import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, SparkThrowable} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{PATH, SPARK_DATA_STREAM} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ @@ -411,7 +412,7 @@ abstract class StreamExecution( case NonFatal(e) => // Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions // when we cannot delete them. - logWarning(s"Cannot delete $checkpointPath", e) + logWarning(log"Cannot delete ${MDC(PATH, checkpointPath)}", e) } } } finally { @@ -446,7 +447,8 @@ abstract class StreamExecution( source.stop() } catch { case NonFatal(e) => - logWarning(s"Failed to stop streaming source: $source. Resources may have leaked.", e) + logWarning(log"Failed to stop streaming source: ${MDC(SPARK_DATA_STREAM, source)}. " + + log"Resources may have leaked.", e) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala index e83c83df5322..dc7224d37407 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala @@ -16,7 +16,8 @@ */ package org.apache.spark.sql.execution.streaming -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{EXPIRY_TIMESTAMP, KEY} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ @@ -130,8 +131,8 @@ class TimerStateImpl( def registerTimer(expiryTimestampMs: Long): Unit = { val groupingKey = getGroupingKey(keyToTsCFName) if (exists(groupingKey, expiryTimestampMs)) { - logWarning(s"Failed to register timer for key=$groupingKey and " + - s"timestamp=$expiryTimestampMs since it already exists") + logWarning(log"Failed to register timer for key=${MDC(KEY, groupingKey)} and " + + log"timestamp=${MDC(EXPIRY_TIMESTAMP, expiryTimestampMs)} ms since it already exists") } else { store.put(encodeKey(groupingKey, expiryTimestampMs), EMPTY_ROW, keyToTsCFName) store.put(encodeSecIndexKey(groupingKey, expiryTimestampMs), EMPTY_ROW, tsToKeyCFName) @@ -147,8 +148,8 @@ class TimerStateImpl( val groupingKey = getGroupingKey(keyToTsCFName) if (!exists(groupingKey, expiryTimestampMs)) { - logWarning(s"Failed to delete timer for key=$groupingKey and " + - s"timestamp=$expiryTimestampMs since it does not exist") + logWarning(log"Failed to delete timer for key=${MDC(KEY, groupingKey)} and " + + log"timestamp=${MDC(EXPIRY_TIMESTAMP, expiryTimestampMs)} ms since it does not exist") } else { store.remove(encodeKey(groupingKey, expiryTimestampMs), keyToTsCFName) store.remove(encodeSecIndexKey(groupingKey, expiryTimestampMs), tsToKeyCFName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index 143230759724..27e7fc015385 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ELAPSED_TIME, TRIGGER_INTERVAL} import org.apache.spark.util.{Clock, SystemClock} trait TriggerExecutor { @@ -98,8 +99,9 @@ case class ProcessingTimeExecutor( /** Called when a batch falls behind */ def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = { - logWarning("Current batch is falling behind. The trigger interval is " + - s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds") + logWarning(log"Current batch is falling behind. The trigger interval is " + + log"${MDC(TRIGGER_INTERVAL, intervalMs)}} milliseconds, but spent " + + log"${MDC(ELAPSED_TIME, realElapsedTimeMs)} milliseconds") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala index b41b3c329712..45a2428d2784 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala @@ -29,7 +29,8 @@ import org.json4s.{DefaultFormats, Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{HOST, PORT} import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -179,7 +180,7 @@ class TextSocketContinuousStream( val line = reader.readLine() if (line == null) { // End of file reached - logWarning(s"Stream closed by $host:$port") + logWarning(log"Stream closed by ${MDC(HOST, host)}:${MDC(PORT, port)}") return } TextSocketContinuousStream.this.synchronized { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala index a01f40bead89..ada39ca4e776 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala @@ -25,7 +25,8 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ListBuffer -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{HOST, PORT} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} @@ -79,7 +80,7 @@ class TextSocketMicroBatchStream(host: String, port: Int, numPartitions: Int) val line = reader.readLine() if (line == null) { // End of file reached - logWarning(s"Stream closed by $host:$port") + logWarning(log"Stream closed by ${MDC(HOST, host)}:${MDC(PORT, port)}") return } TextSocketMicroBatchStream.this.synchronized { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 2ecfa0931042..eaa4e92af929 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -32,7 +32,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKey.{FILE_VERSION, OP_ID, PARTITION_ID, PATH} import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.errors.QueryExecutionErrors @@ -341,7 +342,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with cleanup() } catch { case NonFatal(e) => - logWarning(s"Error performing snapshot and cleaning up $this") + logWarning(log"Error performing snapshot and cleaning up " + toMessageWithContext) } } @@ -354,9 +355,13 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with Nil } + private def toMessageWithContext: MessageWithContext = { + log"HDFSStateStoreProvider[id = (op=${MDC(OP_ID, stateStoreId.operatorId)}," + + log"part=${MDC(PARTITION_ID, stateStoreId.partitionId)}),dir = ${MDC(PATH, baseDir)}]" + } + override def toString(): String = { - s"HDFSStateStoreProvider[" + - s"id = (op=${stateStoreId.operatorId},part=${stateStoreId.partitionId}),dir = $baseDir]" + toMessageWithContext.message } /* Internal fields and methods */ @@ -463,9 +468,9 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with return loadedCurrentVersionMap.get } - logWarning(s"The state for version $version doesn't exist in loadedMaps. " + - "Reading snapshot file and delta files if needed..." + - "Note that this is normal for the first batch of starting query.") + logWarning(log"The state for version ${MDC(FILE_VERSION, version)} doesn't exist in " + + log"loadedMaps. Reading snapshot file and delta files if needed..." + + log"Note that this is normal for the first batch of starting query.") loadedMapCacheMissCount.increment() @@ -720,7 +725,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } } catch { case NonFatal(e) => - logWarning(s"Error doing snapshots for $this", e) + logWarning(log"Error doing snapshots for " + toMessageWithContext, e) } } @@ -751,7 +756,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } } catch { case NonFatal(e) => - logWarning(s"Error cleaning up files for $this", e) + logWarning(log"Error cleaning up files for " + toMessageWithContext, e) } } @@ -805,7 +810,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with case "snapshot" => versionToFiles.put(version, StoreFile(version, path, isSnapshot = true)) case _ => - logWarning(s"Could not identify file $path for $this") + logWarning(log"Could not identify file ${MDC(PATH, path)} for " + toMessageWithContext) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 61c3d349655f..b1005e589073 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -36,7 +36,8 @@ import org.rocksdb.CompressionType._ import org.rocksdb.TickerType._ import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ERROR, PATH} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.util.{NextIterator, Utils} @@ -945,7 +946,8 @@ class RocksDB( Utils.deleteRecursively(file) } catch { case e: Exception => - logWarning(s"Error recursively deleting local dir $file while $msg", e) + logWarning( + log"Error recursively deleting local dir ${MDC(PATH, file)} while ${MDC(ERROR, msg)}", e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index bd1daa48f809..e24a156357f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -38,7 +38,8 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{FILE_VERSION, MAX_FILE_VERSION, PATH} import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager @@ -343,7 +344,8 @@ class RocksDBFileManager( logInfo(s"Deleted changelog file $version") } catch { case e: Exception => - logWarning(s"Error deleting changelog file for version $version", e) + logWarning( + log"Error deleting changelog file for version ${MDC(FILE_VERSION, version)}", e) } } } @@ -446,9 +448,10 @@ class RocksDBFileManager( case e: Exception => failedToDelete += 1 if (maxUsedVersion == -1) { - logWarning(s"Error deleting orphan file $dfsFileName", e) + logWarning(log"Error deleting orphan file ${MDC(PATH, dfsFileName)}", e) } else { - logWarning(s"Error deleting file $dfsFileName, last used in version $maxUsedVersion", e) + logWarning(log"Error deleting file ${MDC(PATH, dfsFileName)}, " + + log"last used in version ${MDC(MAX_FILE_VERSION, maxUsedVersion)}", e) } } } @@ -462,7 +465,8 @@ class RocksDBFileManager( logDebug(s"Deleted version $version") } catch { case e: Exception => - logWarning(s"Error deleting version file $versionFile for version $version", e) + logWarning(log"Error deleting version file ${MDC(PATH, versionFile)} for " + + log"version ${MDC(FILE_VERSION, version)}", e) } } logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to delete" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 85f7fed90c6c..699ce75a88de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -30,7 +30,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.{SparkContext, SparkEnv, SparkUnsupportedOperationException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOAD_TIME, QUERY_RUN_ID, STATE_STORE_PROVIDER, STORE_ID} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.UnsafeRowUtils import org.apache.spark.sql.errors.QueryExecutionErrors @@ -702,9 +703,9 @@ object StateStore extends Logging { } if (loadTimeMs > 2000L) { - logWarning(s"Loaded state store provider in loadTimeMs=$loadTimeMs " + - s"for storeId=${storeProviderId.storeId.toString} and " + - s"queryRunId=${storeProviderId.queryRunId}") + logWarning(log"Loaded state store provider in loadTimeMs=${MDC(LOAD_TIME, loadTimeMs)} " + + log"for storeId=${MDC(STORE_ID, storeProviderId.storeId.toString)} and " + + log"queryRunId=${MDC(QUERY_RUN_ID, storeProviderId.queryRunId)}") } val otherProviderIds = loadedProviders.keys.filter(_ != storeProviderId).toSeq @@ -824,7 +825,8 @@ object StateStore extends Logging { } } catch { case NonFatal(e) => - logWarning(s"Error managing $provider, stopping management thread", e) + logWarning(log"Error managing ${MDC(STATE_STORE_PROVIDER, provider)}, " + + log"stopping management thread", e) threadPoolException.set(e) } finally { val duration = System.currentTimeMillis() - startTime diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index 9802a4dce4e5..1303668b8f86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -24,7 +24,8 @@ import scala.annotation.tailrec import org.apache.hadoop.conf.Configuration import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{END_INDEX, START_INDEX} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, JoinedRow, Literal, SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes @@ -366,9 +367,9 @@ class SymmetricHashJoinStateManager( // If nulls were found at the end, log a warning for the range of null indices. if (nonNullIndex != numValues - 1) { - logWarning(s"`keyWithIndexToValue` returns a null value for indices " + - s"with range from startIndex=${nonNullIndex + 1} " + - s"and endIndex=${numValues - 1}.") + logWarning(log"`keyWithIndexToValue` returns a null value for indices " + + log"with range from startIndex=${MDC(START_INDEX, nonNullIndex + 1)} " + + log"and endIndex=${MDC(END_INDEX, numValues - 1)}.") } // Remove all null values from nonNullIndex + 1 onwards diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 8a2a5282b69d..5e2aae45c680 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import org.apache.spark.{JobExecutionStatus, SparkConf} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config.Status._ import org.apache.spark.scheduler._ import org.apache.spark.sql.connector.metric.CustomMetric @@ -222,9 +223,9 @@ class SQLAppStatusListener( method } catch { case NonFatal(e) => - logWarning(s"Unable to load custom metric object for class `$className`. " + - "Please make sure that the custom metric class is in the classpath and " + - "it has 0-arg constructor.", e) + logWarning(log"Unable to load custom metric object for class " + + log"`${MDC(CLASS_NAME, className)}`. Please make sure that the custom metric " + + log"class is in the classpath and it has 0-arg constructor.", e) // Cannot initialize custom metric object, we might be in history server that does // not have the custom metric class. val defaultMethod = (_: Array[Long], _: Array[Long]) => "N/A" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 164710cdd883..af1304dccee0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -29,7 +29,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path} import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.CacheManager @@ -258,8 +259,9 @@ object SharedState extends Logging { val sparkWarehouseOption = initialConfigs.get(WAREHOUSE_PATH.key).orElse(sparkConf.getOption(WAREHOUSE_PATH.key)) if (initialConfigs.contains(HIVE_WAREHOUSE_CONF_NAME)) { - logWarning(s"Not allowing to set $HIVE_WAREHOUSE_CONF_NAME in SparkSession's " + - s"options, please use ${WAREHOUSE_PATH.key} to set statically for cross-session usages") + logWarning(log"Not allowing to set ${MDC(CONFIG, HIVE_WAREHOUSE_CONF_NAME)} in " + + log"SparkSession's options, please use ${MDC(CONFIG2, WAREHOUSE_PATH.key)} to " + + log"set statically for cross-session usages") } // hive.metastore.warehouse.dir only stay in hadoopConf sparkConf.remove(HIVE_WAREHOUSE_CONF_NAME) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index c2c430a7b39d..e4cd79e3f53c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -24,6 +24,8 @@ import java.util.Locale import scala.util.Using +import org.apache.spark.internal.LogKey.COLUMN_NAME +import org.apache.spark.internal.MDC import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException} @@ -368,7 +370,8 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { } } catch { case e: SQLException => - logWarning(s"Failed to get array dimension for column $columnName", e) + logWarning( + log"Failed to get array dimension for column ${MDC(COLUMN_NAME, columnName)}", e) } case _ => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 225f9d1f19a5..424c4f8a7d26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -25,7 +25,8 @@ import scala.collection.mutable import scala.jdk.CollectionConverters._ import org.apache.spark.annotation.Evolving -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{QUERY_ID, RUN_ID} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement} @@ -367,8 +368,8 @@ class StreamingQueryManager private[sql] ( if (activeOption.isDefined) { if (shouldStopActiveRun) { val oldQuery = activeOption.get - logWarning(s"Stopping existing streaming query [id=${query.id}, " + - s"runId=${oldQuery.runId}], as a new run is being started.") + logWarning(log"Stopping existing streaming query [id=${MDC(QUERY_ID, query.id)}, " + + log"runId=${MDC(RUN_ID, oldQuery.runId)}], as a new run is being started.") Some(oldQuery) } else { throw new IllegalStateException( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org