This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d47f34f3182c [SPARK-48629] Migrate the residual code to structured
logging framework
d47f34f3182c is described below
commit d47f34f3182c45f80ae3f364bcc30f06d4838d43
Author: panbingkun <[email protected]>
AuthorDate: Mon Jun 24 22:46:25 2024 -0700
[SPARK-48629] Migrate the residual code to structured logging framework
### What changes were proposed in this pull request?
The pr aims to migrate the `residual code` to structured logging framework.
### Why are the changes needed?
When I reviewed the spark code, I found that some logs in the some module
were not fully migrated to the structured logging framework, let's to complete
unfinished work.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46986 from panbingkun/sl_other_followup.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../scala/org/apache/spark/internal/LogKey.scala | 19 +++++++++---
.../scala/org/apache/spark/util/MavenUtils.scala | 9 +++---
.../org/apache/spark/util/SparkErrorUtils.scala | 5 +--
.../org/apache/spark/util/SparkFileUtils.scala | 6 ++--
.../sql/streaming/StreamingQueryListenerBus.scala | 6 ++--
...SparkConnectStreamingQueryListenerHandler.scala | 36 +++++++++++-----------
.../streaming/kinesis/KinesisCheckpointer.scala | 2 +-
.../spark/deploy/history/FsHistoryProvider.scala | 4 +--
.../spark/deploy/master/ui/MasterWebUI.scala | 4 +--
.../scala/org/apache/spark/executor/Executor.scala | 2 +-
...AbortableStreamBasedCheckpointFileManager.scala | 6 ++--
.../apache/spark/deploy/k8s/KubernetesConf.scala | 15 ++++-----
.../spark/deploy/yarn/ApplicationMaster.scala | 9 +++---
.../org/apache/spark/deploy/yarn/Client.scala | 4 +--
.../scheduler/cluster/YarnSchedulerBackend.scala | 5 +--
.../sql/execution/streaming/AsyncCommitLog.scala | 4 ++-
.../execution/streaming/AsyncOffsetSeqLog.scala | 4 ++-
.../AsyncProgressTrackingMicroBatchExecution.scala | 10 +++---
.../execution/streaming/MicroBatchExecution.scala | 12 ++++----
.../sql/execution/streaming/StreamExecution.scala | 10 +++---
.../sql/execution/streaming/StreamMetadata.scala | 7 +++--
.../streaming/state/OperatorStateMetadata.scala | 5 +--
.../streaming/state/RocksDBFileManager.scala | 2 +-
.../state/StateSchemaCompatibilityChecker.scala | 6 ++--
.../streaming/state/StateStoreChangelog.scala | 8 ++---
.../hive/service/cli/operation/Operation.java | 2 +-
.../thriftserver/SparkGetFunctionsOperation.scala | 12 +++++---
.../rdd/WriteAheadLogBackedBlockRDD.scala | 5 +--
28 files changed, 118 insertions(+), 101 deletions(-)
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index f90eb4a77071..c0edebf30c9a 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -80,6 +80,7 @@ private[spark] object LogKeys {
case object APP_STATE extends LogKey
case object ARCHIVE_NAME extends LogKey
case object ARGS extends LogKey
+ case object ARTIFACTS extends LogKey
case object ARTIFACT_ID extends LogKey
case object ATTRIBUTE_MAP extends LogKey
case object AUTH_ENABLED extends LogKey
@@ -282,6 +283,7 @@ private[spark] object LogKeys {
case object FREE_MEMORY_SIZE extends LogKey
case object FROM_OFFSET extends LogKey
case object FROM_TIME extends LogKey
+ case object FS_DATA_OUTPUT_STREAM extends LogKey
case object FUNCTION_NAME extends LogKey
case object FUNCTION_PARAM extends LogKey
case object GLOBAL_INIT_FILE extends LogKey
@@ -299,9 +301,8 @@ private[spark] object LogKeys {
case object HIVE_OPERATION_STATE extends LogKey
case object HIVE_OPERATION_TYPE extends LogKey
case object HOST extends LogKey
+ case object HOSTS extends LogKey
case object HOST_LOCAL_BLOCKS_SIZE extends LogKey
- case object HOST_NAME extends LogKey
- case object HOST_NAMES extends LogKey
case object HOST_PORT extends LogKey
case object HOST_PORT2 extends LogKey
case object HUGE_METHOD_LIMIT extends LogKey
@@ -337,6 +338,7 @@ private[spark] object LogKeys {
case object KEY2 extends LogKey
case object KEYTAB extends LogKey
case object KEYTAB_FILE extends LogKey
+ case object KILL_EXECUTORS extends LogKey
case object LABEL_COLUMN extends LogKey
case object LARGEST_CLUSTER_INDEX extends LogKey
case object LAST_ACCESS_TIME extends LogKey
@@ -357,10 +359,10 @@ private[spark] object LogKeys {
case object LOCAL_BLOCKS_SIZE extends LogKey
case object LOCAL_SCRATCH_DIR extends LogKey
case object LOCATION extends LogKey
+ case object LOGICAL_PLAN extends LogKey
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_ID extends LogKey
- case object LOG_KEY_FILE extends LogKey
case object LOG_LEVEL extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
@@ -385,6 +387,7 @@ private[spark] object LogKeys {
case object MAX_NUM_PARTITIONS extends LogKey
case object MAX_NUM_POSSIBLE_BINS extends LogKey
case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
+ case object MAX_SERVICE_NAME_LENGTH extends LogKey
case object MAX_SIZE extends LogKey
case object MAX_SLOTS extends LogKey
case object MAX_SPLIT_BYTES extends LogKey
@@ -395,6 +398,7 @@ private[spark] object LogKeys {
case object MEMORY_THRESHOLD_SIZE extends LogKey
case object MERGE_DIR_NAME extends LogKey
case object MESSAGE extends LogKey
+ case object METADATA extends LogKey
case object METADATA_DIRECTORY extends LogKey
case object METADATA_JSON extends LogKey
case object META_FILE extends LogKey
@@ -541,7 +545,8 @@ private[spark] object LogKeys {
case object OLD_VALUE extends LogKey
case object OPEN_COST_IN_BYTES extends LogKey
case object OPERATION_HANDLE extends LogKey
- case object OPERATION_HANDLE_IDENTIFIER extends LogKey
+ case object OPERATION_HANDLE_ID extends LogKey
+ case object OPERATION_ID extends LogKey
case object OPTIMIZED_PLAN_COLUMNS extends LogKey
case object OPTIMIZER_CLASS_NAME extends LogKey
case object OPTIONS extends LogKey
@@ -583,6 +588,7 @@ private[spark] object LogKeys {
case object POST_SCAN_FILTERS extends LogKey
case object PREDICATE extends LogKey
case object PREDICATES extends LogKey
+ case object PREFERRED_SERVICE_NAME extends LogKey
case object PREFIX extends LogKey
case object PRETTY_ID_STRING extends LogKey
case object PRINCIPAL extends LogKey
@@ -613,6 +619,7 @@ private[spark] object LogKeys {
case object RANGE extends LogKey
case object RATE_LIMIT extends LogKey
case object RATIO extends LogKey
+ case object RDD extends LogKey
case object RDD_CHECKPOINT_DIR extends LogKey
case object RDD_DEBUG_STRING extends LogKey
case object RDD_DESCRIPTION extends LogKey
@@ -641,6 +648,7 @@ private[spark] object LogKeys {
case object REMOVE_FROM_MASTER extends LogKey
case object REPORT_DETAILS extends LogKey
case object REQUESTER_SIZE extends LogKey
+ case object REQUEST_EXECUTORS extends LogKey
case object REQUEST_ID extends LogKey
case object RESOURCE extends LogKey
case object RESOURCE_NAME extends LogKey
@@ -679,6 +687,7 @@ private[spark] object LogKeys {
case object SESSION_KEY extends LogKey
case object SET_CLIENT_INFO_REQUEST extends LogKey
case object SHARD_ID extends LogKey
+ case object SHORTER_SERVICE_NAME extends LogKey
case object SHORT_USER_NAME extends LogKey
case object SHUFFLE_BLOCK_INFO extends LogKey
case object SHUFFLE_DB_BACKEND_KEY extends LogKey
@@ -756,7 +765,6 @@ private[spark] object LogKeys {
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
case object TASK_INDEX extends LogKey
- case object TASK_INFO_ID extends LogKey
case object TASK_LOCALITY extends LogKey
case object TASK_NAME extends LogKey
case object TASK_REQUIREMENTS extends LogKey
@@ -835,6 +843,7 @@ private[spark] object LogKeys {
case object WORKER_PORT extends LogKey
case object WORKER_URL extends LogKey
case object WRITE_AHEAD_LOG_INFO extends LogKey
+ case object WRITE_AHEAD_LOG_RECORD_HANDLE extends LogKey
case object WRITE_JOB_UUID extends LogKey
case object XML_SCHEDULING_MODE extends LogKey
case object XSD_PATH extends LogKey
diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
index ae00987cd69f..546981c8b543 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
@@ -36,7 +36,7 @@ import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver,
IBiblioResolver}
import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.util.ArrayImplicits._
/** Provides utility functions to be used inside SparkSubmit. */
@@ -215,7 +215,7 @@ private[spark] object MavenUtils extends Logging {
if (artifactInfo.getExt == "jar") {
true
} else {
- logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
+ logInfo(log"Skipping non-jar dependency ${MDC(LogKeys.ARTIFACT_ID,
artifactInfo.getId)}")
false
}
}
@@ -515,8 +515,9 @@ private[spark] object MavenUtils extends Logging {
val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED,
true)
if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
val failedArtifacts = failedReports.map(r => r.getArtifact)
- logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ",
"]")}, " +
- s"attempt to retry while skipping local-m2-cache.")
+ logInfo(log"Download failed: " +
+ log"${MDC(LogKeys.ARTIFACTS, failedArtifacts.mkString("[", ", ",
"]"))}, " +
+ log"attempt to retry while skipping local-m2-cache.")
failedArtifacts.foreach(artifact => {
clearInvalidIvyCacheFiles(artifact.getModuleRevisionId,
ivySettings.getDefaultCache)
})
diff --git
a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
index 8194d1e42417..9f604e4bf47f 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets.UTF_8
import scala.util.control.NonFatal
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
private[spark] trait SparkErrorUtils extends Logging {
/**
@@ -74,7 +74,8 @@ private[spark] trait SparkErrorUtils extends Logging {
} catch {
case t: Throwable if (originalThrowable != null && originalThrowable
!= t) =>
originalThrowable.addSuppressed(t)
- logWarning(s"Suppressing exception in finally: ${t.getMessage}", t)
+ logWarning(
+ log"Suppressing exception in finally: ${MDC(LogKeys.MESSAGE,
t.getMessage)}", t)
throw originalThrowable
}
}
diff --git
a/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala
b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala
index e12f8acdadd3..22f03df1b269 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala
@@ -20,7 +20,7 @@ import java.io.File
import java.net.{URI, URISyntaxException}
import java.nio.file.Files
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.network.util.JavaUtils
private[spark] trait SparkFileUtils extends Logging {
@@ -77,12 +77,12 @@ private[spark] trait SparkFileUtils extends Logging {
// remove the check when we're sure that Files.createDirectories() would
never fail silently.
Files.createDirectories(dir.toPath)
if ( !dir.exists() || !dir.isDirectory) {
- logError(s"Failed to create directory " + dir)
+ logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}")
}
dir.isDirectory
} catch {
case e: Exception =>
- logError(s"Failed to create directory " + dir, e)
+ logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}", e)
false
}
}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
index c3c23740e2fe..c2934bcfa705 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
@@ -145,13 +145,11 @@ class StreamingQueryListenerBus(sparkSession:
SparkSession) extends Logging {
case t: QueryTerminatedEvent =>
listener.onQueryTerminated(t)
case _ =>
- logWarning(
- log"Unknown StreamingQueryListener event: " +
- log"${MDC(LogKeys.EVENT, event)}")
+ logWarning(log"Unknown StreamingQueryListener event:
${MDC(LogKeys.EVENT, event)}")
}
} catch {
case e: Exception =>
- logWarning(s"Listener $listener threw an exception", e)
+ logWarning(log"Listener ${MDC(LogKeys.LISTENER, listener)} threw an
exception", e)
})
}
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
index d072b56e022a..ce5aa0888ca5 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
@@ -57,9 +57,10 @@ class
SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
case
StreamingQueryListenerBusCommand.CommandCase.ADD_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
- logWarning(
- s"[SessionId: $sessionId][UserId: $userId][operationId: " +
- s"${executeHolder.operationId}] Redundant server side listener
added. Exiting.")
+ logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]"
+
+ log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+ log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID,
executeHolder.operationId)}] " +
+ log"Redundant server side listener added. Exiting.")
return
case false =>
// This transfers sending back the response to the client until
@@ -86,36 +87,35 @@ class
SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: " +
- log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER,
executeHolder.operationId)}] " +
+ log"${MDC(LogKeys.OPERATION_HANDLE_ID,
executeHolder.operationId)}] " +
log"Error sending listener added response.",
e)
listenerHolder.cleanUp()
return
}
}
- logInfo(
- log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
- log"${MDC(LogKeys.USER_ID, userId)}][operationId: " +
- log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER,
executeHolder.operationId)}] " +
- log"Server side listener added. Now blocking until all client side
listeners are " +
- log"removed or there is error transmitting the event back.")
+ logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
+ log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+ log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID,
executeHolder.operationId)}] " +
+ log"Server side listener added. Now blocking until " +
+ log"all client side listeners are removed or there is error
transmitting the event back.")
// Block the handling thread, and have serverListener continuously
send back new events
listenerHolder.streamingQueryListenerLatch.await()
logInfo(
- log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
- log"${MDC(LogKeys.USER_ID, userId)}]" +
- log"[operationId: " +
- log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER,
executeHolder.operationId)}] " +
+ log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
+ log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+ log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID,
executeHolder.operationId)}] " +
log"Server side listener long-running handling thread ended.")
case
StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
sessionHolder.streamingServersideListenerHolder.cleanUp()
case false =>
- logWarning(
- s"[SessionId: $sessionId][UserId: $userId][operationId: " +
- s"${executeHolder.operationId}] No active server side listener
bus listener " +
- s"but received remove listener call. Exiting.")
+ logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]"
+
+ log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+ log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID,
executeHolder.operationId)}] " +
+ log"No active server side listener bus listener but received
remove listener call. " +
+ log"Exiting.")
return
}
case StreamingQueryListenerBusCommand.CommandCase.COMMAND_NOT_SET =>
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
index 7e5a170a870c..c52eeca1e48a 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
@@ -103,7 +103,7 @@ private[kinesis] class KinesisCheckpointer(
}
} catch {
case NonFatal(e) =>
- logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e)
+ logWarning(log"Failed to checkpoint shardId ${MDC(SHARD_ID, shardId)}
to DynamoDB.", e)
}
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 92ac10ac9fb8..95b23c0f894f 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -1390,8 +1390,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
- logWarning(s"Exception occurred while rebuilding log path
${attempt.logPath} - " +
- "trying again...")
+ logWarning(log"Exception occurred while rebuilding log path " +
+ log"${MDC(LogKeys.PATH, attempt.logPath)} - trying again...")
retried = true
case e: Exception =>
diff --git
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 88cda1bd65cc..6c7a8f582d91 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -25,7 +25,7 @@ import
org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, Maste
import org.apache.spark.deploy.Utils.addRenderLogHandler
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{HOST_NAMES, NUM_REMOVED_WORKERS}
+import org.apache.spark.internal.LogKeys.{HOSTS, NUM_REMOVED_WORKERS}
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE
import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
@@ -79,7 +79,7 @@ class MasterWebUI(
} else {
val removedWorkers = masterEndpointRef.askSync[Integer](
DecommissionWorkersOnHosts(hostnames))
- logInfo(log"Decommissioning of hosts ${MDC(HOST_NAMES,
hostnames)}" +
+ logInfo(log"Decommissioning of hosts ${MDC(HOSTS, hostnames)}" +
log" decommissioned ${MDC(NUM_REMOVED_WORKERS, removedWorkers)}
workers")
if (removedWorkers > 0) {
resp.setStatus(HttpServletResponse.SC_OK)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 7317d3c47c08..586a8a7db28a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -83,7 +83,7 @@ private[spark] class Executor(
extends Logging {
logInfo(log"Starting executor ID ${LogMDC(LogKeys.EXECUTOR_ID, executorId)}"
+
- log" on host ${LogMDC(HOST_NAME, executorHostname)}")
+ log" on host ${LogMDC(HOST, executorHostname)}")
logInfo(log"OS info ${LogMDC(OS_NAME, System.getProperty("os.name"))}," +
log" ${LogMDC(OS_VERSION, System.getProperty("os.version"))}, " +
log"${LogMDC(OS_ARCH, System.getProperty("os.arch"))}")
diff --git
a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
index 20b3d9444884..599361009fcc 100644
---
a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
+++
b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
@@ -53,7 +53,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path,
hadoopConf: Configur
fsDataOutputStream.close()
} catch {
case NonFatal(e) =>
- logWarning(s"Error cancelling write to $path (stream:
$fsDataOutputStream)", e)
+ logWarning(log"Error cancelling write to ${MDC(LogKeys.PATH,
path)} " +
+ log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM,
fsDataOutputStream)})", e)
} finally {
terminated = true
}
@@ -71,7 +72,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path,
hadoopConf: Configur
fsDataOutputStream.close()
} catch {
case NonFatal(e) =>
- logWarning(s"Error closing $path (stream: $fsDataOutputStream)", e)
+ logWarning(log"Error closing ${MDC(LogKeys.PATH, path)} " +
+ log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM,
fsDataOutputStream)})", e)
} finally {
terminated = true
}
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index f62204a8a9c0..deb178eb90e1 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -27,8 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._
import org.apache.spark.deploy.k8s.submit._
-import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{CONFIG, EXECUTOR_ENV_REGEX}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -107,9 +106,11 @@ class KubernetesDriverConf(
} else {
val randomServiceId = KubernetesUtils.uniqueID(clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
- logWarning(s"Driver's hostname would preferably be
$preferredServiceName, but this is " +
- s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling
back to use " +
- s"$shorterServiceName as the driver service's name.")
+ logWarning(log"Driver's hostname would preferably be " +
+ log"${MDC(LogKeys.PREFERRED_SERVICE_NAME, preferredServiceName)}, but
this is too long " +
+ log"(must be <= ${MDC(LogKeys.MAX_SERVICE_NAME_LENGTH,
MAX_SERVICE_NAME_LENGTH)} " +
+ log"characters). Falling back to use " +
+ log"${MDC(LogKeys.SHORTER_SERVICE_NAME, shorterServiceName)} as the
driver service's name.")
shorterServiceName
}
}
@@ -242,10 +243,10 @@ private[spark] class KubernetesExecutorConf(
if (executorEnvRegex.pattern.matcher(key).matches()) {
true
} else {
- logWarning(log"Invalid key: ${MDC(CONFIG, key)}, " +
+ logWarning(log"Invalid key: ${MDC(LogKeys.CONFIG, key)}, " +
log"a valid environment variable name must consist of alphabetic
characters, " +
log"digits, '_', '-', or '.', and must not start with a digit. " +
- log"Regex used for validation is '${MDC(EXECUTOR_ENV_REGEX,
executorEnvRegex)}'")
+ log"Regex used for validation is '${MDC(LogKeys.EXECUTOR_ENV_REGEX,
executorEnvRegex)}'")
false
}
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2523941e0bff..11d22a3225d8 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -527,10 +527,9 @@ private[spark] class ApplicationMaster(
userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
- logError(
- log"""SparkContext did not initialize after waiting for
- |${MDC(LogKeys.TIMEOUT, totalWaitTime)} ms.
- | Please check earlier log output for errors. Failing the
application.""".stripMargin)
+ logError(log"SparkContext did not initialize after waiting for " +
+ log"${MDC(LogKeys.TIMEOUT, totalWaitTime)} ms. " +
+ log"Please check earlier log output for errors. Failing the
application.")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
@@ -869,7 +868,7 @@ private[spark] class ApplicationMaster(
}
} else {
logError(log"Application Master lost connection with driver!
Shutting down. " +
- log"${MDC(LogKeys.REMOTE_ADDRESS, remoteAddress)}")
+ log"${MDC(LogKeys.HOST_PORT, remoteAddress)}")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_DISCONNECTED)
}
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index bf31e03ba9a8..b2c4d97bc7b0 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1211,11 +1211,11 @@ private[spark] class Client(
cleanupStagingDir()
return YarnAppReport(YarnApplicationState.KILLED,
FinalApplicationStatus.KILLED, None)
case NonFatal(e) if !e.isInstanceOf[InterruptedIOException] =>
- val msg = s"Failed to contact YARN for application $appId."
+ val msg = log"Failed to contact YARN for application
${MDC(LogKeys.APP_ID, appId)}."
logError(msg, e)
// Don't necessarily clean up staging dir because status is unknown
return YarnAppReport(YarnApplicationState.FAILED,
FinalApplicationStatus.FAILED,
- Some(msg))
+ Some(msg.message))
}
val state = report.getYarnApplicationState
reportsSinceLastLog += 1
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e21f1c3268ae..cd81f11510fe 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -366,7 +366,8 @@ private[spark] abstract class YarnSchedulerBackend(
am.ask[Boolean](r).andThen {
case Success(b) => context.reply(b)
case Failure(NonFatal(e)) =>
- logError(s"Sending $r to AM was unsuccessful", e)
+ logError(
+ log"Sending ${MDC(LogKeys.REQUEST_EXECUTORS, r)} to AM was
unsuccessful", e)
context.sendFailure(e)
}(ThreadUtils.sameThread)
case None =>
@@ -380,7 +381,7 @@ private[spark] abstract class YarnSchedulerBackend(
am.ask[Boolean](k).andThen {
case Success(b) => context.reply(b)
case Failure(NonFatal(e)) =>
- logError(s"Sending $k to AM was unsuccessful", e)
+ logError(log"Sending ${MDC(LogKeys.KILL_EXECUTORS, k)} to AM
was unsuccessful", e)
context.sendFailure(e)
}(ThreadUtils.sameThread)
case None =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
index 686e0bb86886..6db01624fd26 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{CompletableFuture,
ConcurrentLinkedDeque, ThreadPoo
import scala.jdk.CollectionConverters._
+import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -125,7 +126,8 @@ class AsyncCommitLog(sparkSession: SparkSession, path:
String, executorService:
}
} catch {
case e: Throwable =>
- logError(s"Encountered error while writing batch ${batchId} to
commit log", e)
+ logError(log"Encountered error while writing batch " +
+ log"${MDC(LogKeys.BATCH_ID, batchId)} to commit log", e)
future.completeExceptionally(e)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
index a89a9132e03e..54a8855b77cd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.jdk.CollectionConverters._
+import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.util.{Clock, SystemClock}
@@ -159,7 +160,8 @@ class AsyncOffsetSeqLog(
}
} catch {
case e: Throwable =>
- logError(s"Encountered error while writing batch ${batchId} to
offset log", e)
+ logError(log"Encountered error while writing batch " +
+ log"${MDC(LogKeys.BATCH_ID, batchId)} to offset log", e)
future.completeExceptionally(e)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
index ce11e9100782..4a7cb5b71a77 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicLong
-import org.apache.spark.internal.LogKeys.PRETTY_ID_STRING
+import org.apache.spark.internal.LogKeys.{BATCH_ID, PRETTY_ID_STRING}
import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.streaming.WriteToStream
@@ -158,8 +158,8 @@ class AsyncProgressTrackingMicroBatchExecution(
}
})
.exceptionally((th: Throwable) => {
- logError(s"Encountered error while performing" +
- s" async offset write for batch ${execCtx.batchId}", th)
+ logError(log"Encountered error while performing async offset write for
batch " +
+ log"${MDC(BATCH_ID, execCtx.batchId)}", th)
errorNotifier.markError(th)
return
})
@@ -190,8 +190,8 @@ class AsyncProgressTrackingMicroBatchExecution(
commitLog
.addAsync(execCtx.batchId,
CommitMetadata(watermarkTracker.currentWatermark))
.exceptionally((th: Throwable) => {
- logError(s"Got exception during async write to commit log" +
- s" for batch ${execCtx.batchId}", th)
+ logError(log"Got exception during async write to commit log for
batch " +
+ log"${MDC(BATCH_ID, execCtx.batchId)}", th)
errorNotifier.markError(th)
return
})
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index ef49de5e0857..f636413f7c51 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -419,12 +419,12 @@ class MicroBatchExecution(
def validateOffsetLogAndGetPrevOffset(latestBatchId: Long):
Option[OffsetSeq] = {
if (latestBatchId != 0) {
Some(offsetLog.get(latestBatchId - 1).getOrElse {
- logError(s"The offset log for batch ${latestBatchId - 1} doesn't
exist, " +
- s"which is required to restart the query from the latest batch
$latestBatchId " +
- "from the offset log. Please ensure there are two subsequent offset
logs " +
- "available for the latest batch via manually deleting the offset
file(s). " +
- "Please also ensure the latest batch for commit log is equal or one
batch " +
- "earlier than the latest batch for offset log.")
+ logError(log"The offset log for batch ${MDC(LogKeys.BATCH_ID,
latestBatchId - 1)} " +
+ log"doesn't exist, which is required to restart the query from the
latest batch " +
+ log"${MDC(LogKeys.LATEST_BATCH_ID, latestBatchId)} from the offset
log. Please ensure " +
+ log"there are two subsequent offset logs available for the latest
batch via manually " +
+ log"deleting the offset file(s). Please also ensure the latest batch
for commit log is " +
+ log"equal or one batch earlier than the latest batch for offset
log.")
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't
exist")
})
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4198d7367fe2..a6320a664aa1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -32,8 +32,8 @@ import
com.google.common.util.concurrent.UncheckedExecutionException
import org.apache.hadoop.fs.Path
import org.apache.spark.{JobArtifactSet, SparkContext, SparkException,
SparkThrowable}
-import org.apache.spark.internal.{Logging, LogKeys, MDC}
-import org.apache.spark.internal.LogKeys.{CHECKPOINT_PATH, CHECKPOINT_ROOT,
PATH, PRETTY_ID_STRING, SPARK_DATA_STREAM}
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKeys.{CHECKPOINT_PATH, CHECKPOINT_ROOT,
LOGICAL_PLAN, PATH, PRETTY_ID_STRING, SPARK_DATA_STREAM}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
@@ -322,8 +322,7 @@ abstract class StreamExecution(
if (state.compareAndSet(INITIALIZING, ACTIVE)) {
// Log logical plan at the start of the query to help debug issues
related to
// plan changes.
- logInfo(log"Finish initializing with logical plan:\n" +
- log"${MDC(LogKeys.QUERY_PLAN, logicalPlan)}")
+ logInfo(log"Finish initializing with logical
plan:\n${MDC(LOGICAL_PLAN, logicalPlan)}")
// Unblock `awaitInitialization`
initializationLatch.countDown()
@@ -373,8 +372,7 @@ abstract class StreamExecution(
case _ => None
}
- logError(log"Query ${MDC(LogKeys.PRETTY_ID_STRING, prettyIdString)} " +
- log"terminated with error", e)
+ logError(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} terminated
with error", e)
getLatestExecutionContext().updateStatusMessage(s"Terminated with
exception: $message")
// Rethrow the fatal errors to allow the user using
`Thread.UncaughtExceptionHandler` to
// handle them
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index 978cb3c34f60..84519150ca42 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException,
FSDataInputStream, Path
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.errors.QueryExecutionErrors
import
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
@@ -60,7 +60,7 @@ object StreamMetadata extends Logging {
Some(metadata)
} catch {
case NonFatal(e) =>
- logError(s"Error reading stream metadata from $metadataFile", e)
+ logError(log"Error reading stream metadata from ${MDC(LogKeys.PATH,
metadataFile)}", e)
throw e
} finally {
IOUtils.closeQuietly(input)
@@ -91,7 +91,8 @@ object StreamMetadata extends Logging {
if (output != null) {
output.cancel()
}
- logError(s"Error writing stream metadata $metadata to $metadataFile",
e)
+ logError(log"Error writing stream metadata ${MDC(LogKeys.METADATA,
metadata)} to " +
+ log"${MDC(LogKeys.PATH, metadataFile)}", e)
throw e
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index b58c805af9d6..8ce883038401 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager,
MetadataVersionUtil}
/**
@@ -105,7 +105,8 @@ class OperatorStateMetadataWriter(stateCheckpointPath:
Path, hadoopConf: Configu
outputStream.close()
} catch {
case e: Throwable =>
- logError(s"Fail to write state metadata file to $metadataFilePath", e)
+ logError(
+ log"Fail to write state metadata file to ${MDC(LogKeys.META_FILE,
metadataFilePath)}", e)
outputStream.cancel()
throw e
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 0a460ece2400..b2b0ef8ce712 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -682,7 +682,7 @@ class RocksDBFileManager(
case e: Exception =>
// Cancel the actual output stream first, so that zout.close() does
not write the file
out.cancel()
- logError(s"Error zipping to $filesStr", e)
+ logError(log"Error zipping to ${MDC(LogKeys.FILE_NAME, filesStr)}", e)
throw e
} finally {
// Close everything no matter what happened
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index bc3e4b7e6dbe..374ab52b9110 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import
org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader,
SchemaWriter}
import org.apache.spark.sql.types.{DataType, StructType}
@@ -75,7 +75,7 @@ class StateSchemaCompatibilityChecker(
schemaReader.read(inStream)
} catch {
case e: Throwable =>
- logError(s"Fail to read schema file from $schemaFileLocation", e)
+ logError(log"Fail to read schema file from ${MDC(LogKeys.PATH,
schemaFileLocation)}", e)
throw e
} finally {
inStream.close()
@@ -97,7 +97,7 @@ class StateSchemaCompatibilityChecker(
outStream.close()
} catch {
case e: Throwable =>
- logError(s"Fail to write schema file to $schemaFileLocation", e)
+ logError(log"Fail to write schema file to ${MDC(LogKeys.PATH,
schemaFileLocation)}", e)
outStream.cancel()
throw e
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 20df67b25bfe..b1860be41ac4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -25,7 +25,7 @@ import com.google.common.io.ByteStreams
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FSError, Path}
-import org.apache.spark.internal.{Logging, LogKeys, MDC}
+import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -176,8 +176,7 @@ class StateStoreChangelogWriterV1(
} catch {
case e: Throwable =>
abort()
- logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME,
file)} " +
- log"because of exception ${MDC(LogKeys.EXCEPTION, e)}")
+ logError(log"Fail to commit changelog file ${MDC(PATH, file)} because
of exception", e)
throw e
} finally {
backingFileStream = null
@@ -256,8 +255,7 @@ class StateStoreChangelogWriterV2(
} catch {
case e: Throwable =>
abort()
- logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME,
file)} " +
- log"because of exception ${MDC(LogKeys.EXCEPTION, e)}")
+ logError(log"Fail to commit changelog file ${MDC(PATH, file)} because
of exception", e)
throw e
} finally {
backingFileStream = null
diff --git
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
index f0c1985ce58a..f488a411c31f 100644
---
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
+++
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
@@ -290,7 +290,7 @@ public abstract class Operation {
if (operationLog == null) {
LOG.error("Operation [ {} ] logging is enabled, " +
"but its OperationLog object cannot be found.",
- MDC.of(LogKeys.OPERATION_HANDLE_IDENTIFIER$.MODULE$,
opHandle.getHandleIdentifier()));
+ MDC.of(LogKeys.OPERATION_HANDLE_ID$.MODULE$,
opHandle.getHandleIdentifier()));
} else {
operationLog.close();
}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
index 9cf31d99ccfa..b060bf3d4ec8 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
@@ -51,9 +51,11 @@ private[hive] class SparkGetFunctionsOperation(
override def runInternal(): Unit = {
// Do not change cmdStr. It's used for Hive auditing and authorization.
- val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
- val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'"
- logInfo(log"${MDC(LogKeys.MESSAGE, logMsg)} with
${MDC(LogKeys.STATEMENT_ID, statementId)}")
+ val cmdMDC = log"catalog : ${MDC(LogKeys.CATALOG_NAME, catalogName)}, " +
+ log"schemaPattern : ${MDC(LogKeys.DATABASE_NAME, schemaName)}"
+ val logMDC = log"Listing functions '" + cmdMDC +
+ log", functionName : ${MDC(LogKeys.FUNCTION_NAME, functionName)}'"
+ logInfo(logMDC + log" with ${MDC(LogKeys.STATEMENT_ID, statementId)}")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
@@ -69,13 +71,13 @@ private[hive] class SparkGetFunctionsOperation(
// authorize this call on the schema objects
val privObjs =
HivePrivilegeObjectUtils.getHivePrivDbObjects(matchingDbs.asJava)
- authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr)
+ authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs,
cmdMDC.message)
}
HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
- logMsg,
+ logMDC.message,
statementId,
parentSession.getUsername)
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 9c461f0d4270..12c6c95f7d8d 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.spark._
+import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util._
@@ -156,8 +157,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
s"Could not read data from write ahead log record
${partition.walRecordHandle}, " +
s"read returned null")
}
- logInfo(s"Read partition data of $this from write ahead log, record
handle " +
- partition.walRecordHandle)
+ logInfo(log"Read partition data of ${MDC(LogKeys.RDD, this)} from write
ahead log, " +
+ log"record handle ${MDC(LogKeys.WRITE_AHEAD_LOG_RECORD_HANDLE,
partition.walRecordHandle)}")
if (storeInBlockManager) {
blockManager.putBytes(blockId, new
ChunkedByteBuffer(dataRead.duplicate()), storageLevel)
logDebug(s"Stored partition data of $this into block manager with
level $storageLevel")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]