This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b0e2cb575390 [SPARK-48623][CORE] Structured Logging Migrations
b0e2cb575390 is described below
commit b0e2cb575390d9dabb1142a78f4ceed48c059212
Author: Amanda Liu <[email protected]>
AuthorDate: Tue Jun 18 23:51:28 2024 -0700
[SPARK-48623][CORE] Structured Logging Migrations
### What changes were proposed in this pull request?
This PR migrates Scala logging to comply with the scala style changes in
[#46979](https://github.com/apache/spark/pull/46947)
### Why are the changes needed?
This makes development and PR review of the structured logging migration
easier.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested by ensuring `dev/scalastyle` checks pass
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #46980 from asl3/logging-migrationscala.
Authored-by: Amanda Liu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../scala/org/apache/spark/internal/LogKey.scala | 26 +++++++++++++++
.../sql/streaming/StreamingQueryListenerBus.scala | 9 +++--
...SparkConnectStreamingQueryListenerHandler.scala | 26 ++++++++++-----
.../org/apache/spark/broadcast/Broadcast.scala | 5 +--
.../spark/deploy/ExternalShuffleService.scala | 4 +--
.../apache/spark/deploy/worker/WorkerWatcher.scala | 4 +--
.../spark/shuffle/IndexShuffleBlockResolver.scala | 6 ++--
...AbortableStreamBasedCheckpointFileManager.scala | 4 +--
.../ml/classification/LogisticRegression.scala | 12 +++----
.../spark/deploy/yarn/ApplicationMaster.scala | 13 +++++---
.../org/apache/spark/deploy/yarn/Client.scala | 2 +-
.../cluster/YarnClientSchedulerBackend.scala | 4 +--
.../apache/spark/sql/types/UDTRegistration.scala | 5 +--
.../expressions/codegen/CodeGenerator.scala | 10 +++---
.../execution/adaptive/ShufflePartitionsUtil.scala | 8 +++--
.../PythonStreamingPartitionReaderFactory.scala | 5 +--
.../python/PythonStreamingSourceRunner.scala | 5 +--
.../sql/execution/streaming/ProgressReporter.scala | 9 ++---
.../sql/execution/streaming/StreamExecution.scala | 8 +++--
.../streaming/continuous/ContinuousExecution.scala | 3 +-
.../streaming/continuous/ContinuousWriteRDD.scala | 8 +++--
.../streaming/state/StateStoreChangelog.scala | 8 +++--
.../SparkExecuteStatementOperation.scala | 16 ++++-----
.../thriftserver/SparkGetFunctionsOperation.scala | 4 +--
.../org/apache/spark/streaming/Checkpoint.scala | 38 +++++++++++++--------
.../apache/spark/streaming/StreamingContext.scala | 5 +--
.../apache/spark/streaming/dstream/DStream.scala | 39 +++++++++++++---------
.../streaming/dstream/SocketInputDStream.scala | 8 ++---
.../streaming/receiver/ReceiverSupervisor.scala | 9 ++---
.../receiver/ReceiverSupervisorImpl.scala | 8 ++---
.../scheduler/ExecutorAllocationManager.scala | 24 ++++++++-----
.../streaming/scheduler/InputInfoTracker.scala | 5 +--
.../streaming/scheduler/ReceivedBlockTracker.scala | 17 ++++++----
.../streaming/scheduler/ReceiverTracker.scala | 19 ++++++-----
.../scheduler/rate/PIDRateEstimator.scala | 9 +++--
.../streaming/util/BatchedWriteAheadLog.scala | 5 +--
.../streaming/util/FileBasedWriteAheadLog.scala | 19 +++++++----
37 files changed, 257 insertions(+), 152 deletions(-)
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 40d3f67a48a7..f90eb4a77071 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -65,7 +65,10 @@ private[spark] object LogKeys {
case object ADDED_JARS extends LogKey
case object ADMIN_ACLS extends LogKey
case object ADMIN_ACL_GROUPS extends LogKey
+ case object ADVISORY_TARGET_SIZE extends LogKey
case object AGGREGATE_FUNCTIONS extends LogKey
+ case object ALIGNED_FROM_TIME extends LogKey
+ case object ALIGNED_TO_TIME extends LogKey
case object ALPHA extends LogKey
case object ANALYSIS_ERROR extends LogKey
case object APP_ATTEMPT_ID extends LogKey
@@ -77,8 +80,10 @@ private[spark] object LogKeys {
case object APP_STATE extends LogKey
case object ARCHIVE_NAME extends LogKey
case object ARGS extends LogKey
+ case object ARTIFACT_ID extends LogKey
case object ATTRIBUTE_MAP extends LogKey
case object AUTH_ENABLED extends LogKey
+ case object AVG_BATCH_PROC_TIME extends LogKey
case object BACKUP_FILE extends LogKey
case object BARRIER_EPOCH extends LogKey
case object BARRIER_ID extends LogKey
@@ -99,6 +104,8 @@ private[spark] object LogKeys {
case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
case object BUCKET extends LogKey
case object BYTECODE_SIZE extends LogKey
+ case object BYTE_BUFFER extends LogKey
+ case object BYTE_SIZE extends LogKey
case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
case object CACHE_AUTO_REMOVED_SIZE extends LogKey
case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
@@ -109,6 +116,7 @@ private[spark] object LogKeys {
case object CATALOG_NAME extends LogKey
case object CATEGORICAL_FEATURES extends LogKey
case object CHECKPOINT_FILE extends LogKey
+ case object CHECKPOINT_INTERVAL extends LogKey
case object CHECKPOINT_LOCATION extends LogKey
case object CHECKPOINT_PATH extends LogKey
case object CHECKPOINT_ROOT extends LogKey
@@ -186,6 +194,7 @@ private[spark] object LogKeys {
case object DELEGATE extends LogKey
case object DELTA extends LogKey
case object DEPRECATED_KEY extends LogKey
+ case object DERIVATIVE extends LogKey
case object DESCRIPTION extends LogKey
case object DESIRED_NUM_PARTITIONS extends LogKey
case object DESIRED_TREE_DEPTH extends LogKey
@@ -197,6 +206,7 @@ private[spark] object LogKeys {
case object DRIVER_MEMORY_SIZE extends LogKey
case object DRIVER_STATE extends LogKey
case object DROPPED_PARTITIONS extends LogKey
+ case object DSTREAM extends LogKey
case object DURATION extends LogKey
case object EARLIEST_LOADED_VERSION extends LogKey
case object EFFECTIVE_STORAGE_LEVEL extends LogKey
@@ -251,6 +261,7 @@ private[spark] object LogKeys {
case object FEATURE_NAME extends LogKey
case object FETCH_SIZE extends LogKey
case object FIELD_NAME extends LogKey
+ case object FILES extends LogKey
case object FILE_ABSOLUTE_PATH extends LogKey
case object FILE_END_OFFSET extends LogKey
case object FILE_FORMAT extends LogKey
@@ -307,6 +318,7 @@ private[spark] object LogKeys {
case object INIT_MODE extends LogKey
case object INPUT extends LogKey
case object INPUT_SPLIT extends LogKey
+ case object INTEGRAL extends LogKey
case object INTERVAL extends LogKey
case object ISOLATION_LEVEL extends LogKey
case object ISSUE_DATE extends LogKey
@@ -394,6 +406,7 @@ private[spark] object LogKeys {
case object MIN_COMPACTION_BATCH_ID extends LogKey
case object MIN_NUM_FREQUENT_PATTERN extends LogKey
case object MIN_POINT_PER_CLUSTER extends LogKey
+ case object MIN_RATE extends LogKey
case object MIN_SHARE extends LogKey
case object MIN_SIZE extends LogKey
case object MIN_TIME extends LogKey
@@ -490,6 +503,7 @@ private[spark] object LogKeys {
case object NUM_PREFIXES extends LogKey
case object NUM_PRUNED extends LogKey
case object NUM_PUSH_MERGED_LOCAL_BLOCKS extends LogKey
+ case object NUM_RECEIVERS extends LogKey
case object NUM_RECORDS_READ extends LogKey
case object NUM_RELEASED_LOCKS extends LogKey
case object NUM_REMAINED extends LogKey
@@ -547,6 +561,7 @@ private[spark] object LogKeys {
case object PARTITIONER extends LogKey
case object PARTITION_ID extends LogKey
case object PARTITION_IDS extends LogKey
+ case object PARTITION_SIZE extends LogKey
case object PARTITION_SPECIFICATION extends LogKey
case object PARTITION_SPECS extends LogKey
case object PATH extends LogKey
@@ -575,6 +590,7 @@ private[spark] object LogKeys {
case object PROCESSING_TIME extends LogKey
case object PRODUCER_ID extends LogKey
case object PROPERTY_NAME extends LogKey
+ case object PROPORTIONAL extends LogKey
case object PROTOCOL_VERSION extends LogKey
case object PROVIDER extends LogKey
case object PUSHED_FILTERS extends LogKey
@@ -595,6 +611,8 @@ private[spark] object LogKeys {
case object QUERY_PLAN_LENGTH_MAX extends LogKey
case object QUERY_RUN_ID extends LogKey
case object RANGE extends LogKey
+ case object RATE_LIMIT extends LogKey
+ case object RATIO extends LogKey
case object RDD_CHECKPOINT_DIR extends LogKey
case object RDD_DEBUG_STRING extends LogKey
case object RDD_DESCRIPTION extends LogKey
@@ -646,6 +664,8 @@ private[spark] object LogKeys {
case object RULE_NAME extends LogKey
case object RUN_ID extends LogKey
case object SCALA_VERSION extends LogKey
+ case object SCALING_DOWN_RATIO extends LogKey
+ case object SCALING_UP_RATIO extends LogKey
case object SCHEDULER_POOL_NAME extends LogKey
case object SCHEDULING_MODE extends LogKey
case object SCHEMA extends LogKey
@@ -671,12 +691,14 @@ private[spark] object LogKeys {
case object SHUFFLE_SERVICE_NAME extends LogKey
case object SIGMAS_LENGTH extends LogKey
case object SIGNAL extends LogKey
+ case object SINK extends LogKey
case object SIZE extends LogKey
case object SLEEP_TIME extends LogKey
case object SLIDE_DURATION extends LogKey
case object SMALLEST_CLUSTER_INDEX extends LogKey
case object SNAPSHOT_VERSION extends LogKey
case object SOCKET_ADDRESS extends LogKey
+ case object SOURCE extends LogKey
case object SOURCE_PATH extends LogKey
case object SPARK_BRANCH extends LogKey
case object SPARK_BUILD_DATE extends LogKey
@@ -708,6 +730,7 @@ private[spark] object LogKeys {
case object STORAGE_LEVEL_REPLICATION extends LogKey
case object STORAGE_MEMORY_SIZE extends LogKey
case object STORE_ID extends LogKey
+ case object STREAMING_CONTEXT extends LogKey
case object STREAMING_DATA_SOURCE_DESCRIPTION extends LogKey
case object STREAMING_DATA_SOURCE_NAME extends LogKey
case object STREAMING_OFFSETS_END extends LogKey
@@ -729,6 +752,7 @@ private[spark] object LogKeys {
case object TARGET_NUM_EXECUTOR extends LogKey
case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
case object TARGET_PATH extends LogKey
+ case object TARGET_SIZE extends LogKey
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
case object TASK_INDEX extends LogKey
@@ -752,6 +776,7 @@ private[spark] object LogKeys {
case object THREAD_POOL_SIZE extends LogKey
case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey
case object THRESHOLD extends LogKey
+ case object THRESH_TIME extends LogKey
case object TIME extends LogKey
case object TIMEOUT extends LogKey
case object TIMER extends LogKey
@@ -814,4 +839,5 @@ private[spark] object LogKeys {
case object XML_SCHEDULING_MODE extends LogKey
case object XSD_PATH extends LogKey
case object YOUNG_GENERATION_GC extends LogKey
+ case object ZERO_TIME extends LogKey
}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
index 56a9e19a1b78..c3c23740e2fe 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListenerBus.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList
import scala.jdk.CollectionConverters._
import org.apache.spark.connect.proto.{Command, ExecutePlanResponse, Plan,
StreamingQueryEventType}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.CloseableIterator
import org.apache.spark.sql.streaming.StreamingQueryListener.{Event,
QueryIdleEvent, QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
@@ -115,7 +115,7 @@ class StreamingQueryListenerBus(sparkSession: SparkSession)
extends Logging {
case StreamingQueryEventType.QUERY_TERMINATED_EVENT =>
postToAll(QueryTerminatedEvent.fromJson(event.getEventJson))
case _ =>
- logWarning(s"Unknown StreamingQueryListener event: $event")
+ logWarning(log"Unknown StreamingQueryListener event:
${MDC(LogKeys.EVENT, event)}")
}
})
}
@@ -144,7 +144,10 @@ class StreamingQueryListenerBus(sparkSession:
SparkSession) extends Logging {
listener.onQueryIdle(t)
case t: QueryTerminatedEvent =>
listener.onQueryTerminated(t)
- case _ => logWarning(s"Unknown StreamingQueryListener event: $event")
+ case _ =>
+ logWarning(
+ log"Unknown StreamingQueryListener event: " +
+ log"${MDC(LogKeys.EVENT, event)}")
}
} catch {
case e: Exception =>
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
index 94f01026b7a5..d072b56e022a 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectStreamingQueryListenerHandler.scala
@@ -24,7 +24,7 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.connect.proto.StreamingQueryListenerBusCommand
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.connect.service.ExecuteHolder
/**
@@ -83,20 +83,30 @@ class
SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
} catch {
case NonFatal(e) =>
logError(
- s"[SessionId: $sessionId][UserId: $userId][operationId: " +
- s"${executeHolder.operationId}] Error sending listener
added response.",
+ log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
+ log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
+ log"[operationId: " +
+ log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER,
executeHolder.operationId)}] " +
+ log"Error sending listener added response.",
e)
listenerHolder.cleanUp()
return
}
}
- logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
- s"${executeHolder.operationId}] Server side listener added. Now
blocking until " +
- "all client side listeners are removed or there is error
transmitting the event back.")
+ logInfo(
+ log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
+ log"${MDC(LogKeys.USER_ID, userId)}][operationId: " +
+ log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER,
executeHolder.operationId)}] " +
+ log"Server side listener added. Now blocking until all client side
listeners are " +
+ log"removed or there is error transmitting the event back.")
// Block the handling thread, and have serverListener continuously
send back new events
listenerHolder.streamingQueryListenerLatch.await()
- logInfo(s"[SessionId: $sessionId][UserId: $userId][operationId: " +
- s"${executeHolder.operationId}] Server side listener long-running
handling thread ended.")
+ logInfo(
+ log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
+ log"${MDC(LogKeys.USER_ID, userId)}]" +
+ log"[operationId: " +
+ log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER,
executeHolder.operationId)}] " +
+ log"Server side listener long-running handling thread ended.")
case
StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 445b7d4d7aa0..3adb540a7ad1 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -22,7 +22,7 @@ import java.io.Serializable
import scala.reflect.ClassTag
import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.util.Utils
/**
@@ -106,7 +106,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends
Serializable with Lo
assertValid()
_isValid = false
_destroySite = Utils.getCallSite().shortForm
- logInfo("Destroying %s (from %s)".format(toString, _destroySite))
+ logInfo(log"Destroying ${MDC(LogKeys.BROADCAST, toString)} " +
+ log"(from ${MDC(LogKeys.CALL_SITE_SHORT_FORM, _destroySite)})")
doDestroy(blocking)
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 3ce5e2d62b6a..851fb453fd09 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -71,8 +71,8 @@ class ExternalShuffleService(sparkConf: SparkConf,
securityManager: SecurityMana
if (localDirs.length >= 1) {
new File(localDirs.find(new File(_,
dbName).exists()).getOrElse(localDirs(0)), dbName)
} else {
- logWarning(s"'spark.local.dir' should be set first when we use db in " +
- s"ExternalShuffleService. Note that this only affects standalone
mode.")
+ logWarning("'spark.local.dir' should be set first when we use db in " +
+ "ExternalShuffleService. Note that this only affects standalone mode.")
null
}
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index be11c23f306e..bd07a0ade523 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker
import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.WORKER_URL
import org.apache.spark.rpc._
@@ -64,7 +64,7 @@ private[spark] class WorkerWatcher(
}
override def receive: PartialFunction[Any, Unit] = {
- case e => logWarning(s"Received unexpected message: $e")
+ case e => logWarning(log"Received unexpected message: ${MDC(LogKeys.ERROR,
e)}")
}
override def onConnected(remoteAddress: RpcAddress): Unit = {
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index d0a202cb7951..dde9b541b62f 100644
---
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -28,7 +28,7 @@ import com.google.common.cache.CacheBuilder
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException}
import org.apache.spark.errors.SparkCoreErrors
-import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.NioBufferedFileInputStream
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer,
ManagedBuffer}
@@ -436,8 +436,8 @@ private[spark] class IndexShuffleBlockResolver(
if (checksumTmp.exists()) {
try {
if (!checksumTmp.delete()) {
- logError(s"Failed to delete temporary checksum file " +
- s"at ${checksumTmp.getAbsolutePath}")
+ logError(log"Failed to delete temporary checksum file at " +
+ log"${MDC(LogKeys.PATH, checksumTmp.getAbsolutePath)}")
}
} catch {
case e: Exception =>
diff --git
a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
index 2afab01ec7b0..20b3d9444884 100644
---
a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
+++
b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
@@ -24,7 +24,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import
org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
import
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
@@ -36,7 +36,7 @@ class AbortableStreamBasedCheckpointFileManager(path: Path,
hadoopConf: Configur
s" an fs (path: $path) with abortable stream support")
}
- logInfo(s"Writing atomically to $path based on abortable stream")
+ logInfo(log"Writing atomically to ${MDC(LogKeys.PATH, path)} based on
abortable stream")
class AbortableStreamBasedFSDataOutputStream(
fsDataOutputStream: FSDataOutputStream,
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index b523bd750836..b3c48f13591f 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -503,8 +503,8 @@ class LogisticRegression @Since("1.2.0") (
tol, fitIntercept, maxBlockSizeInMB)
if (dataset.storageLevel != StorageLevel.NONE) {
- instr.logWarning(s"Input instances will be standardized, blockified to
blocks, and " +
- s"then cached during training. Be careful of double caching!")
+ instr.logWarning("Input instances will be standardized, blockified to
blocks, and " +
+ "then cached during training. Be careful of double caching!")
}
val instances = dataset.select(
@@ -569,8 +569,8 @@ class LogisticRegression @Since("1.2.0") (
val isConstantLabel = histogram.count(_ != 0.0) == 1
if ($(fitIntercept) && isConstantLabel &&
!usingBoundConstrainedOptimization) {
- instr.logWarning(s"All labels are the same value and fitIntercept=true,
so the " +
- s"coefficients will be zeros. Training is not needed.")
+ instr.logWarning("All labels are the same value and fitIntercept=true,
so the " +
+ "coefficients will be zeros. Training is not needed.")
val constantLabelIndex = Vectors.dense(histogram).argmax
val coefMatrix = new SparseMatrix(numCoefficientSets, numFeatures,
new Array[Int](numCoefficientSets + 1), Array.emptyIntArray,
Array.emptyDoubleArray,
@@ -584,8 +584,8 @@ class LogisticRegression @Since("1.2.0") (
}
if (!$(fitIntercept) && isConstantLabel) {
- instr.logWarning(s"All labels belong to a single class and
fitIntercept=false. It's a " +
- s"dangerous ground, so the algorithm may not converge.")
+ instr.logWarning("All labels belong to a single class and
fitIntercept=false. It's a " +
+ "dangerous ground, so the algorithm may not converge.")
}
val featuresMean = summarizer.mean.toArray
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4b5f9be3193f..2523941e0bff 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -528,8 +528,9 @@ private[spark] class ApplicationMaster(
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
logError(
- s"SparkContext did not initialize after waiting for $totalWaitTime
ms. " +
- "Please check earlier log output for errors. Failing the
application.")
+ log"""SparkContext did not initialize after waiting for
+ |${MDC(LogKeys.TIMEOUT, totalWaitTime)} ms.
+ | Please check earlier log output for errors. Failing the
application.""".stripMargin)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
@@ -690,7 +691,7 @@ private[spark] class ApplicationMaster(
}
} catch {
case ioe: IOException =>
- logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ logError(log"Failed to cleanup staging dir ${MDC(LogKeys.PATH,
stagingDirPath)}", ioe)
}
}
@@ -736,7 +737,8 @@ private[spark] class ApplicationMaster(
override def run(): Unit = {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
- logError(s"Could not find static main method in object
${args.userClass}")
+ logError(log"Could not find static main method in object " +
+ log"${MDC(LogKeys.CLASS_NAME, args.userClass)}")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
mainMethod.invoke(null, userArgs.toArray)
@@ -866,7 +868,8 @@ private[spark] class ApplicationMaster(
finish(FinalApplicationStatus.FAILED, exitCode)
}
} else {
- logError(s"Application Master lost connection with driver! Shutting
down. $remoteAddress")
+ logError(log"Application Master lost connection with driver!
Shutting down. " +
+ log"${MDC(LogKeys.REMOTE_ADDRESS, remoteAddress)}")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_DISCONNECTED)
}
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0567d8efb85e..bf31e03ba9a8 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1385,7 +1385,7 @@ private[spark] class Client(
val YarnAppReport(appState, finalState, diags) = monitorApplication()
if (appState == YarnApplicationState.FAILED || finalState ==
FinalApplicationStatus.FAILED) {
diags.foreach { err =>
- logError(s"Application diagnostics message: $err")
+ logError(log"Application diagnostics message: ${MDC(LogKeys.ERROR,
err)}")
}
throw new SparkException(s"Application $appId finished with failed
status")
}
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 26be1ff89314..8032d782cf4f 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -26,7 +26,7 @@ import
org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, YarnApplicati
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.{config, Logging, MDC}
+import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{APP_ID, APP_STATE}
import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
@@ -120,7 +120,7 @@ private[spark] class YarnClientSchedulerBackend(
logError(log"YARN application has exited unexpectedly with state " +
log"${MDC(APP_STATE, state)}! Check the YARN application logs for
more details.")
diags.foreach { err =>
- logError(s"Diagnostics message: $err")
+ logError(log"Diagnostics message: ${MDC(LogKeys.ERROR, err)}")
}
allowInterrupt = false
sc.stop()
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala
index 42c8c783e54c..9219c1d139b9 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.types
import scala.collection.mutable
import org.apache.spark.annotation.{DeveloperApi, Since}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.util.SparkClassUtils
@@ -58,7 +58,8 @@ object UDTRegistration extends Serializable with Logging {
*/
def register(userClass: String, udtClass: String): Unit = {
if (udtMap.contains(userClass)) {
- logWarning(s"Cannot register UDT for ${userClass}, which is already
registered.")
+ logWarning(log"Cannot register UDT for ${MDC(LogKeys.CLASS_NAME,
userClass)}, " +
+ log"which is already registered.")
} else {
// When register UDT with class name, we can't check if the UDT class is
an UserDefinedType,
// or not. The check is deferred.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index ba3f1df22bc4..a39c10866984 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -32,9 +32,8 @@ import org.codehaus.janino.util.ClassFile
import org.apache.spark.{SparkException, SparkIllegalArgumentException,
TaskContext, TaskKilledException}
import org.apache.spark.executor.InputMetrics
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
-import org.apache.spark.internal.MDC
import org.apache.spark.metrics.source.CodegenMetrics
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.HashableWeakReference
@@ -1593,9 +1592,10 @@ object CodeGenerator extends Logging {
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
if (byteCodeSize > DEFAULT_JVM_HUGE_METHOD_LIMIT) {
- logInfo("Generated method too long to be JIT compiled: " +
- log"${MDC(CLASS_NAME, cf.getThisClassName)}.${MDC(METHOD_NAME,
method.getName)} " +
- log"is ${MDC(BYTECODE_SIZE, byteCodeSize)} bytes")
+ logInfo(log"Generated method too long to be JIT compiled: " +
+ log"${MDC(LogKeys.CLASS_NAME, cf.getThisClassName)}." +
+ log"${MDC(LogKeys.METHOD_NAME, method.getName)} is " +
+ log"${MDC(LogKeys.BYTECODE_SIZE, byteCodeSize)} bytes")
}
byteCodeSize
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 9370b3d8d1d7..bb7d904402de 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.adaptive
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.execution.{CoalescedPartitionSpec,
PartialReducerPartitionSpec, ShufflePartitionSpec}
object ShufflePartitionsUtil extends Logging {
@@ -61,8 +61,10 @@ object ShufflePartitionsUtil extends Logging {
val targetSize =
maxTargetSize.min(advisoryTargetSize).max(minPartitionSize)
val shuffleIds =
mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
- logInfo(s"For shuffle($shuffleIds), advisory target size:
$advisoryTargetSize, " +
- s"actual target size $targetSize, minimum partition size:
$minPartitionSize")
+ logInfo(log"For shuffle(${MDC(LogKeys.SHUFFLE_ID, shuffleIds)}, advisory
target size: " +
+ log"${MDC(LogKeys.ADVISORY_TARGET_SIZE, advisoryTargetSize)}, actual
target size " +
+ log"${MDC(LogKeys.TARGET_SIZE, targetSize)}, minimum partition size: " +
+ log"${MDC(LogKeys.PARTITION_SIZE, minPartitionSize)}")
// If `inputPartitionSpecs` are all empty, it means skew join optimization
is not applied.
if (inputPartitionSpecs.forall(_.isEmpty)) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala
index 75a38b8ea622..7d80cc272810 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonStreamingPartitionReaderFactory.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.execution.datasources.v2.python
import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
@@ -52,7 +52,8 @@ class PythonStreamingPartitionReaderFactory(
val block = SparkEnv.get.blockManager.get[InternalRow](part.blockId.get)
.map(_.data.asInstanceOf[Iterator[InternalRow]])
if (block.isEmpty) {
- logWarning(s"Prefetched block ${part.blockId} for Python data source
not found.")
+ logWarning(log"Prefetched block ${MDC(LogKeys.BLOCK_ID, part.blockId)}
" +
+ log"for Python data source not found.")
}
block
} else None
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
index a512b34db345..33612b6947f2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala
@@ -27,7 +27,7 @@ import org.apache.arrow.vector.ipc.ArrowStreamReader
import org.apache.spark.SparkEnv
import org.apache.spark.api.python.{PythonFunction, PythonWorker,
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.PYTHON_EXEC
import org.apache.spark.internal.config.BUFFER_SIZE
import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
@@ -214,7 +214,8 @@ class PythonStreamingSourceRunner(
* Stop the python worker process and invoke stop() on stream reader.
*/
def stop(): Unit = {
- logInfo(s"Stopping streaming runner for module: $workerModule.")
+ logInfo(log"Stopping streaming runner for module: " +
+ log"${MDC(LogKeys.MODULE_NAME, workerModule)}.")
try {
pythonWorkerFactory.foreach { factory =>
pythonWorker.foreach { worker =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 3842ed574355..c440ec451b72 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -352,8 +352,9 @@ abstract class ProgressContext(
metrics = sourceMetrics
)
}
- logInfo(s"Extracting source progress metrics for
source=${source.toString} took " +
- s"duration_ms=$duration")
+ logInfo(log"Extracting source progress metrics for source=" +
+ log"${MDC(LogKeys.SOURCE, source.toString)} " +
+ log"took duration_ms=${MDC(LogKeys.DURATION, duration)}")
result
}
}
@@ -368,8 +369,8 @@ abstract class ProgressContext(
SinkProgress(sink.toString, sinkOutput, sinkMetrics)
}
- logInfo(s"Extracting sink progress metrics for sink=${sink.toString} took
" +
- s"duration_ms=$duration")
+ logInfo(log"Extracting sink progress metrics for sink=${MDC(LogKeys.SINK,
sink.toString)} " +
+ log"took duration_ms=${MDC(LogKeys.DURATION, duration)}")
result
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 420deda3e017..4198d7367fe2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -32,7 +32,7 @@ import
com.google.common.util.concurrent.UncheckedExecutionException
import org.apache.hadoop.fs.Path
import org.apache.spark.{JobArtifactSet, SparkContext, SparkException,
SparkThrowable}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{CHECKPOINT_PATH, CHECKPOINT_ROOT,
PATH, PRETTY_ID_STRING, SPARK_DATA_STREAM}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -322,7 +322,8 @@ abstract class StreamExecution(
if (state.compareAndSet(INITIALIZING, ACTIVE)) {
// Log logical plan at the start of the query to help debug issues
related to
// plan changes.
- logInfo(s"Finish initializing with logical plan:\n$logicalPlan")
+ logInfo(log"Finish initializing with logical plan:\n" +
+ log"${MDC(LogKeys.QUERY_PLAN, logicalPlan)}")
// Unblock `awaitInitialization`
initializationLatch.countDown()
@@ -372,7 +373,8 @@ abstract class StreamExecution(
case _ => None
}
- logError(s"Query $prettyIdString terminated with error", e)
+ logError(log"Query ${MDC(LogKeys.PRETTY_ID_STRING, prettyIdString)} " +
+ log"terminated with error", e)
getLatestExecutionContext().updateStatusMessage(s"Terminated with
exception: $message")
// Rethrow the fatal errors to allow the user using
`Thread.UncaughtExceptionHandler` to
// handle them
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index fa49da5feeed..633aaf2682db 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -445,7 +445,8 @@ class ContinuousExecution(
*/
def stopInNewThread(error: Throwable): Unit = {
if (failure.compareAndSet(null, error)) {
- logError(s"Query $prettyIdString received exception $error")
+ logError(log"Query ${MDC(PRETTY_ID_STRING, prettyIdString)} received
exception " +
+ log"${MDC(ERROR, error)}")
stopInNewThread()
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
index 5d54b5754915..d5daa9a875f8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
@@ -18,8 +18,8 @@
package org.apache.spark.sql.execution.streaming.continuous
import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.internal.{LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
-import org.apache.spark.internal.MDC
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.write.DataWriter
@@ -89,9 +89,11 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow],
writerFactory: StreamingDat
})(catchBlock = {
// If there is an error, abort this writer. We enter this callback in
the middle of
// rethrowing an exception, so compute() will stop executing at this
point.
- logError(s"Writer for partition ${context.partitionId()} is aborting.")
+ logError(log"Writer for partition ${MDC(LogKeys.PARTITION_ID,
context.partitionId())} " +
+ log"is aborting.")
if (dataWriter != null) dataWriter.abort()
- logError(s"Writer for partition ${context.partitionId()} aborted.")
+ logError(log"Writer for partition ${MDC(LogKeys.PARTITION_ID,
context.partitionId())} " +
+ log"aborted.")
}, finallyBlock = {
if (dataWriter != null) dataWriter.close()
})
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 23f867d3e6c0..20df67b25bfe 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -25,7 +25,7 @@ import com.google.common.io.ByteStreams
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FSError, Path}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -176,7 +176,8 @@ class StateStoreChangelogWriterV1(
} catch {
case e: Throwable =>
abort()
- logError(s"Fail to commit changelog file $file because of exception
$e")
+ logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME,
file)} " +
+ log"because of exception ${MDC(LogKeys.EXCEPTION, e)}")
throw e
} finally {
backingFileStream = null
@@ -255,7 +256,8 @@ class StateStoreChangelogWriterV2(
} catch {
case e: Throwable =>
abort()
- logError(s"Fail to commit changelog file $file because of exception
$e")
+ logError(log"Fail to commit changelog file ${MDC(LogKeys.FILE_NAME,
file)} " +
+ log"because of exception ${MDC(LogKeys.EXCEPTION, e)}")
throw e
} finally {
backingFileStream = null
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index b86e996a3408..51a5e88aa633 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -30,7 +30,7 @@ import
org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc,
TPrimitiveTypeEntry, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId,
TTypeQualifiers, TTypeQualifierValue}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -83,7 +83,7 @@ private[hive] class SparkExecuteStatementOperation(
val sparkType = new StructType().add("Result", "string")
SparkExecuteStatementOperation.toTTableSchema(sparkType)
} else {
- logInfo(s"Result Schema: ${result.schema.sql}")
+ logInfo(log"Result Schema: ${MDC(LogKeys.SCHEMA, result.schema.sql)}")
SparkExecuteStatementOperation.toTTableSchema(result.schema)
}
}
@@ -126,8 +126,8 @@ private[hive] class SparkExecuteStatementOperation(
override def runInternal(): Unit = {
setState(OperationState.PENDING)
logInfo(
- log"Submitting query '${MDC(REDACTED_STATEMENT, redactedStatement)}'
with " +
- log"${MDC(STATEMENT_ID, statementId)}")
+ log"Submitting query '${MDC(LogKeys.REDACTED_STATEMENT,
redactedStatement)}' with " +
+ log"${MDC(LogKeys.STATEMENT_ID, statementId)}")
HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
@@ -215,8 +215,8 @@ private[hive] class SparkExecuteStatementOperation(
synchronized {
if (getStatus.getState.isTerminal) {
logInfo(
- log"Query with ${MDC(STATEMENT_ID, statementId)} in terminal state
" +
- log"before it started running")
+ log"Query with ${MDC(LogKeys.STATEMENT_ID, statementId)} in
terminal state " +
+ log"before it started running")
return
} else {
logInfo(log"Running query with ${MDC(STATEMENT_ID, statementId)}")
@@ -289,8 +289,8 @@ private[hive] class SparkExecuteStatementOperation(
synchronized {
if (!getStatus.getState.isTerminal) {
logInfo(
- log"Query with ${MDC(STATEMENT_ID, statementId)} timed out after " +
- log"${MDC(TIMEOUT, timeout)} seconds")
+ log"Query with ${MDC(LogKeys.STATEMENT_ID, statementId)} timed out "
+
+ log"after ${MDC(LogKeys.TIMEOUT, timeout)} seconds")
setState(OperationState.TIMEDOUT)
cleanup()
HiveThriftServer2.eventManager.onStatementTimeout(statementId)
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
index 53a94a128c0e..9cf31d99ccfa 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
@@ -27,7 +27,7 @@ import
org.apache.hive.service.cli.operation.GetFunctionsOperation
import
org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG
import org.apache.hive.service.cli.session.HiveSession
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.SQLContext
/**
@@ -53,7 +53,7 @@ private[hive] class SparkGetFunctionsOperation(
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'"
- logInfo(s"$logMsg with $statementId")
+ logInfo(log"${MDC(LogKeys.MESSAGE, logMsg)} with
${MDC(LogKeys.STATEMENT_ID, statementId)}")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 6cbc74a75a06..f09f9caf129b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{BACKUP_FILE, CHECKPOINT_FILE,
CHECKPOINT_TIME, NUM_RETRY, PATH, TEMP_FILE}
import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
@@ -102,7 +102,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime:
Time)
assert(framework != null, "Checkpoint.framework is null")
assert(graph != null, "Checkpoint.graph is null")
assert(checkpointTime != null, "Checkpoint.checkpointTime is null")
- logInfo(s"Checkpoint for time $checkpointTime validated")
+ logInfo(log"Checkpoint for time ${MDC(LogKeys.CHECKPOINT_TIME,
checkpointTime)} validated")
}
}
@@ -242,7 +242,8 @@ class CheckpointWriter(
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
- logInfo(s"Saving checkpoint for time $checkpointTime to file
'$checkpointFile'")
+ logInfo(log"Saving checkpoint for time
${MDC(LogKeys.CHECKPOINT_TIME, checkpointTime)} " +
+ log"to file '${MDC(LogKeys.CHECKPOINT_FILE, checkpointFile)}'")
if (fs == null) {
fs = new Path(checkpointDir).getFileSystem(hadoopConf)
}
@@ -275,15 +276,19 @@ class CheckpointWriter(
val allCheckpointFiles =
Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
if (allCheckpointFiles.size > 10) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach {
file =>
- logInfo(s"Deleting $file")
+ logInfo(log"Deleting ${MDC(LogKeys.FILE_NAME, file)}")
fs.delete(file, true)
}
}
// All done, print success
- logInfo(s"Checkpoint for time $checkpointTime saved to file
'$checkpointFile'" +
- s", took ${bytes.length} bytes and " +
- s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTimeNs)} ms")
+ logInfo(
+ log"Checkpoint for time ${MDC(LogKeys.CHECKPOINT_TIME,
checkpointTime)} " +
+ log"saved to file " +
+ log"'${MDC(LogKeys.CHECKPOINT_FILE, checkpointFile)}', took " +
+ log"${MDC(LogKeys.BYTE_SIZE, bytes.length)} bytes and " +
+ log"${MDC(LogKeys.TIME,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
+ - startTimeNs))} ms")
jobGenerator.onCheckpointCompletion(checkpointTime,
clearCheckpointDataLater)
return
} catch {
@@ -304,7 +309,8 @@ class CheckpointWriter(
val bytes = Checkpoint.serialize(checkpoint, conf)
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
- logInfo(s"Submitted checkpoint of time ${checkpoint.checkpointTime} to
writer queue")
+ logInfo(log"Submitted checkpoint of time ${MDC(LogKeys.CHECKPOINT_TIME,
+ checkpoint.checkpointTime)} to writer queue")
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool
executor", rej)
@@ -316,8 +322,10 @@ class CheckpointWriter(
val startTimeNs = System.nanoTime()
ThreadUtils.shutdown(executor, FiniteDuration(10, TimeUnit.SECONDS))
- logInfo(s"CheckpointWriter executor terminated? ${executor.isTerminated},"
+
- s" waited for ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTimeNs)} ms.")
+ logInfo(log"CheckpointWriter executor terminated? " +
+ log"${MDC(LogKeys.EXECUTOR_STATE, executor.isTerminated)}, waited for " +
+ log"${MDC(LogKeys.DURATION, TimeUnit.NANOSECONDS.toMillis(
+ System.nanoTime() - startTimeNs))} ms.")
stopped = true
}
}
@@ -357,15 +365,17 @@ object CheckpointReader extends Logging {
}
// Try to read the checkpoint files in the order
- logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}")
+ logInfo(log"Checkpoint files found: " +
+ log"${MDC(LogKeys.CHECKPOINT_FILE, checkpointFiles.mkString(","))}")
var readError: Exception = null
checkpointFiles.foreach { file =>
- logInfo(s"Attempting to load checkpoint from file $file")
+ logInfo(log"Attempting to load checkpoint from file
${MDC(LogKeys.FILE_NAME, file)}")
try {
val fis = fs.open(file)
val cp = Checkpoint.deserialize(fis, conf)
- logInfo(s"Checkpoint successfully loaded from file $file")
- logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}")
+ logInfo(log"Checkpoint successfully loaded from file
${MDC(LogKeys.FILE_NAME, file)}")
+ logInfo(log"Checkpoint was generated at time " +
+ log"${MDC(LogKeys.CHECKPOINT_TIME, cp.checkpointTime)}")
return Some(cp)
} catch {
case e: Exception =>
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 30bd30329283..94b695e6452e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -36,7 +36,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.FixedLengthBinaryInputFormat
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.SerializationDebugger
@@ -725,7 +725,8 @@ class StreamingContext private[streaming] (
private def stopOnShutdown(): Unit = {
val stopGracefully = conf.get(STOP_GRACEFULLY_ON_SHUTDOWN)
- logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown
hook")
+ logInfo(log"Invoking stop(stopGracefully=" +
+ log"${MDC(LogKeys.VALUE, stopGracefully)}) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 38f55f80657b..87d6a4909fdd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import scala.util.matching.Regex
import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{FROM_TIME, SLIDE_DURATION, TO_TIME}
import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope}
@@ -201,7 +201,8 @@ abstract class DStream[T: ClassTag] (
// Set the checkpoint interval to be slideDuration or 10 seconds, which
ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) /
slideDuration).toInt
- logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
+ logInfo(log"Checkpoint interval automatically set to " +
+ log"${MDC(LogKeys.CHECKPOINT_INTERVAL, checkpointDuration)}")
}
// Set the minimum value of the rememberDuration if not already set
@@ -277,11 +278,11 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.validateAtStart())
- logInfo(s"Slide time = $slideDuration")
- logInfo(s"Storage level = ${storageLevel.description}")
- logInfo(s"Checkpoint interval = $checkpointDuration")
- logInfo(s"Remember interval = $rememberDuration")
- logInfo(s"Initialized and validated $this")
+ logInfo(log"Slide time = ${MDC(LogKeys.SLIDE_DURATION, slideDuration)}")
+ logInfo(log"Storage level = ${MDC(LogKeys.STORAGE_LEVEL,
storageLevel.description)}")
+ logInfo(log"Checkpoint interval = ${MDC(LogKeys.CHECKPOINT_INTERVAL,
checkpointDuration)}")
+ logInfo(log"Remember interval = ${MDC(LogKeys.INTERVAL,
rememberDuration)}")
+ logInfo(log"Initialized and validated ${MDC(LogKeys.DSTREAM, this)}")
}
private[streaming] def setContext(s: StreamingContext): Unit = {
@@ -289,7 +290,7 @@ abstract class DStream[T: ClassTag] (
throw new SparkException(s"Context must not be set again for $this")
}
ssc = s
- logInfo(s"Set context for $this")
+ logInfo(log"Set context for ${MDC(LogKeys.STREAMING_CONTEXT, this)}")
dependencies.foreach(_.setContext(ssc))
}
@@ -304,7 +305,9 @@ abstract class DStream[T: ClassTag] (
private[streaming] def remember(duration: Duration): Unit = {
if (duration != null && (rememberDuration == null || duration >
rememberDuration)) {
rememberDuration = duration
- logInfo(s"Duration for remembering RDDs set to $rememberDuration for
$this")
+ logInfo(log"Duration for remembering RDDs set to " +
+ log"${MDC(LogKeys.DURATION, rememberDuration)} for " +
+ log"${MDC(LogKeys.DSTREAM, this.toString)}")
}
dependencies.foreach(_.remember(parentRememberDuration))
}
@@ -314,8 +317,10 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) {
throw new SparkException (this.toString + " has not been initialized")
} else if (time <= zeroTime || ! (time -
zeroTime).isMultipleOf(slideDuration)) {
- logInfo(s"Time $time is invalid as zeroTime is $zeroTime" +
- s" , slideDuration is $slideDuration and difference is ${time -
zeroTime}")
+ logInfo(log"Time ${MDC(LogKeys.TIME, time)} is invalid as zeroTime is " +
+ log"${MDC(LogKeys.ZERO_TIME, zeroTime)}, slideDuration is " +
+ log"${MDC(LogKeys.SLIDE_DURATION, slideDuration)} and difference is " +
+ log"${MDC(LogKeys.DURATION, time - zeroTime)}")
false
} else {
logDebug(s"Time $time is valid")
@@ -353,7 +358,8 @@ abstract class DStream[T: ClassTag] (
}
if (checkpointDuration != null && (time -
zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
- logInfo(s"Marking RDD ${newRDD.id} for time $time for
checkpointing")
+ logInfo(log"Marking RDD ${MDC(LogKeys.RDD_ID, newRDD.id)} for time
" +
+ log"${MDC(LogKeys.TIME, time)} for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
@@ -461,7 +467,8 @@ abstract class DStream[T: ClassTag] (
// Explicitly remove blocks of BlockRDD
rdd match {
case b: BlockRDD[_] =>
- logInfo(s"Removing blocks of RDD $b of time $time")
+ logInfo(log"Removing blocks of RDD ${MDC(LogKeys.RDD_ID, b)} " +
+ log"of time ${MDC(LogKeys.TIME, time)}")
b.removeBlocks()
case _ =>
}
@@ -898,8 +905,10 @@ abstract class DStream[T: ClassTag] (
fromTime.floor(slideDuration, zeroTime)
}
- logInfo(s"Slicing from $fromTime to $toTime" +
- s" (aligned to $alignedFromTime and $alignedToTime)")
+ logInfo(log"Slicing from ${MDC(LogKeys.FROM_TIME, fromTime)} to " +
+ log"${MDC(LogKeys.TO_TIME, toTime)}" +
+ log" (aligned to ${MDC(LogKeys.ALIGNED_FROM_TIME, alignedFromTime)} and
" +
+ log"${MDC(LogKeys.ALIGNED_TO_TIME, alignedToTime)})")
alignedFromTime.to(alignedToTime, slideDuration).flatMap { time =>
if (time >= zeroTime) getOrCompute(time) else None
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 883d56c012f6..34b079219c99 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver
@@ -56,7 +56,7 @@ class SocketReceiver[T: ClassTag](
def onStart(): Unit = {
- logInfo(s"Connecting to $host:$port")
+ logInfo(log"Connecting to ${MDC(LogKeys.HOST, host)}:${MDC(LogKeys.PORT,
port)}")
try {
socket = new Socket(host, port)
} catch {
@@ -64,7 +64,7 @@ class SocketReceiver[T: ClassTag](
restart(s"Error connecting to $host:$port", e)
return
}
- logInfo(s"Connected to $host:$port")
+ logInfo(log"Connected to ${MDC(LogKeys.HOST, host)}:${MDC(LogKeys.PORT,
port)}")
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
@@ -79,7 +79,7 @@ class SocketReceiver[T: ClassTag](
if (socket != null) {
socket.close()
socket = null
- logInfo(s"Closed socket to $host:$port")
+ logInfo(log"Closed socket to ${MDC(LogKeys.HOST,
host)}:${MDC(LogKeys.PORT, port)}")
}
}
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 79bfd8674b44..7cc08b421f78 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -25,7 +25,7 @@ import scala.concurrent._
import scala.util.control.NonFatal
import org.apache.spark.SparkConf
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{DELAY, ERROR, MESSAGE, STREAM_ID}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -145,10 +145,10 @@ private[streaming] abstract class ReceiverSupervisor(
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
- logInfo(s"Starting receiver $streamId")
+ logInfo(log"Starting receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
receiverState = Started
receiver.onStart()
- logInfo(s"Called receiver $streamId onStart")
+ logInfo(log"Called receiver ${MDC(LogKeys.STREAM_ID, streamId)}
onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start
receiver " + streamId, None)
@@ -162,7 +162,8 @@ private[streaming] abstract class ReceiverSupervisor(
/** Stop receiver */
def stopReceiver(message: String, error: Option[Throwable]): Unit =
synchronized {
try {
- logInfo("Stopping receiver with message: " + message + ": " +
error.getOrElse(""))
+ logInfo(log"Stopping receiver with message: ${MDC(LogKeys.MESSAGE,
message)}: " +
+ log"${MDC(LogKeys.ERROR, error.getOrElse(""))}")
receiverState match {
case Initialized =>
logWarning("Skip stopping receiver because it has not yet stared")
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 47beb4521950..aafa99bd5285 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -28,7 +28,7 @@ import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{ERROR, MESSAGE}
import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.storage.StreamBlockId
@@ -85,7 +85,7 @@ private[streaming] class ReceiverSupervisorImpl(
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
- logInfo(s"Received a new rate limit: $eps.")
+ logInfo(log"Received a new rate limit: ${MDC(LogKeys.RATE_LIMIT,
eps)}.")
registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
@@ -195,10 +195,10 @@ private[streaming] class ReceiverSupervisorImpl(
}
override protected def onReceiverStop(message: String, error:
Option[Throwable]): Unit = {
- logInfo("Deregistering receiver " + streamId)
+ logInfo(log"Deregistering receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message,
errorString))
- logInfo("Stopped receiver " + streamId)
+ logInfo(log"Stopped receiver ${MDC(LogKeys.STREAM_ID, streamId)}")
}
override def createBlockGenerator(
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index 5aa2a9df3ba8..903cde8082db 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -21,7 +21,7 @@ package org.apache.spark.streaming.scheduler
import scala.util.Random
import org.apache.spark.{ExecutorAllocationClient, SparkConf}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.Streaming._
import org.apache.spark.resource.ResourceProfile
@@ -75,8 +75,10 @@ private[streaming] class ExecutorAllocationManager(
def start(): Unit = {
timer.start()
- logInfo(s"ExecutorAllocationManager started with " +
- s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval =
$scalingIntervalSecs sec")
+ logInfo(log"ExecutorAllocationManager started with ratios = " +
+ log"[${MDC(LogKeys.SCALING_UP_RATIO, scalingUpRatio)}, " +
+ log"${MDC(LogKeys.SCALING_DOWN_RATIO, scalingDownRatio)}] and interval =
" +
+ log"${MDC(LogKeys.INTERVAL, scalingIntervalSecs)} sec")
}
def stop(): Unit = {
@@ -89,11 +91,14 @@ private[streaming] class ExecutorAllocationManager(
* batch statistics.
*/
private def manageAllocation(): Unit = synchronized {
- logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio,
$scalingDownRatio]")
+ logInfo(log"Managing executor allocation with ratios = [" +
+ log"${MDC(LogKeys.SCALING_UP_RATIO, scalingUpRatio)}, " +
+ log"${MDC(LogKeys.SCALING_DOWN_RATIO, scalingDownRatio)}]")
if (batchProcTimeCount > 0) {
val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
val ratio = averageBatchProcTime.toDouble / batchDurationMs
- logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
+ logInfo(log"Average: ${MDC(LogKeys.AVG_BATCH_PROC_TIME,
averageBatchProcTime)}, " +
+ log"ratio = ${MDC(LogKeys.RATIO, ratio)}")
if (ratio >= scalingUpRatio) {
logDebug("Requesting executors")
val numNewExecutors = math.max(math.round(ratio).toInt, 1)
@@ -119,7 +124,8 @@ private[streaming] class ExecutorAllocationManager(
Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> targetTotalExecutors),
Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> 0),
Map.empty)
- logInfo(s"Requested total $targetTotalExecutors executors")
+ logInfo(log"Requested total ${MDC(LogKeys.NUM_EXECUTORS,
+ targetTotalExecutors)} executors")
}
/** Kill an executor that is not running any receiver, if possible */
@@ -129,7 +135,9 @@ private[streaming] class ExecutorAllocationManager(
if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
val execIdsWithReceivers =
receiverTracker.allocatedExecutors().values.flatten.toSeq
- logInfo(s"Executors with receivers (${execIdsWithReceivers.size}):
${execIdsWithReceivers}")
+ logInfo(log"Executors with receivers (${MDC(LogKeys.NUM_EXECUTORS,
+ execIdsWithReceivers.size)}): " +
+ log"${MDC(LogKeys.EXECUTOR_IDS, execIdsWithReceivers)}")
val removableExecIds = allExecIds.diff(execIdsWithReceivers)
logDebug(s"Removable executors (${removableExecIds.size}):
${removableExecIds}")
@@ -142,7 +150,7 @@ private[streaming] class ExecutorAllocationManager(
} else {
client.killExecutor(execIdToRemove)
}
- logInfo(s"Requested to kill executor $execIdToRemove")
+ logInfo(log"Requested to kill executor ${MDC(LogKeys.EXECUTOR_ID,
execIdToRemove)}")
} else {
logInfo(s"No non-receiver executors to kill")
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
index 639ac6de4f5d..bd9ea7b5a268 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.streaming.{StreamingContext, Time}
/**
@@ -82,7 +82,8 @@ private[streaming] class InputInfoTracker(ssc:
StreamingContext) extends Logging
/** Cleanup the tracked input information older than threshold batch time */
def cleanup(batchThreshTime: Time): Unit = synchronized {
val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
- logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")
+ logInfo(log"remove old batch metadata: " +
+ log"${MDC(LogKeys.DURATION, timesToCleanup.mkString(" "))}")
batchTimeToInputInfos --= timesToCleanup
}
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index f2700737384b..7fb35a04be6d 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{RECEIVED_BLOCK_INFO,
RECEIVED_BLOCK_TRACKER_LOG_EVENT}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.streaming.Time
@@ -127,7 +127,9 @@ private[streaming] class ReceivedBlockTracker(
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
- logInfo(s"Possibly processed batch $batchTime needs to be processed
again in WAL recovery")
+ logInfo(log"Possibly processed batch ${MDC(LogKeys.BATCH_TIMESTAMP,
+ batchTime)} needs to be " +
+ log"processed again in WAL recovery")
}
} else {
// This situation occurs when:
@@ -137,7 +139,9 @@ private[streaming] class ReceivedBlockTracker(
// 2. Slow checkpointing makes recovered batch time older than WAL
recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
- logInfo(s"Possibly processed batch $batchTime needs to be processed
again in WAL recovery")
+ logInfo(log"Possibly processed batch ${MDC(LogKeys.BATCH_TIMESTAMP,
+ batchTime)} needs to be processed " +
+ log"again in WAL recovery")
}
}
@@ -175,7 +179,7 @@ private[streaming] class ReceivedBlockTracker(
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean):
Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ <
cleanupThreshTime }.toSeq
- logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")
+ logInfo(log"Deleting batches: ${MDC(LogKeys.DURATION,
timesToCleanup.mkString(" "))}")
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds,
waitForCompletion))
@@ -221,9 +225,10 @@ private[streaming] class ReceivedBlockTracker(
}
writeAheadLogOption.foreach { writeAheadLog =>
- logInfo(s"Recovering from write ahead logs in
${checkpointDirOption.get}")
+ logInfo(log"Recovering from write ahead logs in " +
+ log"${MDC(LogKeys.PATH, checkpointDirOption.get)}")
writeAheadLog.readAll().asScala.foreach { byteBuffer =>
- logInfo("Recovering record " + byteBuffer)
+ logInfo(log"Recovering record ${MDC(LogKeys.BYTE_BUFFER, byteBuffer)}")
Utils.deserialize[ReceivedBlockTrackerLogEvent](
JavaUtils.bufferToArray(byteBuffer),
Thread.currentThread().getContextClassLoader) match {
case BlockAdditionEvent(receivedBlockInfo) =>
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 48273b3b593c..a37ba04c1012 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
import org.apache.spark._
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{ERROR, MESSAGE, RECEIVER_ID,
RECEIVER_IDS, STREAM_ID}
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc._
@@ -232,7 +232,8 @@ class ReceiverTracker(ssc: StreamingContext,
skipReceiverLaunch: Boolean = false
// Signal the receivers to delete old block data
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
- logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
+ logInfo(log"Cleanup old received batch data: " +
+ log"${MDC(LogKeys.CLEANUP_LOCAL_DIRS, cleanupThreshTime)}")
synchronized {
if (isTrackerStarted) {
endpoint.send(CleanupOldBlocks(cleanupThreshTime))
@@ -306,7 +307,8 @@ class ReceiverTracker(ssc: StreamingContext,
skipReceiverLaunch: Boolean = false
endpoint = Some(receiverEndpoint))
receiverTrackingInfos.put(streamId, receiverTrackingInfo)
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
- logInfo("Registered receiver for stream " + streamId + " from " +
senderAddress)
+ logInfo(log"Registered receiver for stream ${MDC(LogKeys.STREAM_ID,
streamId)} " +
+ log"from ${MDC(LogKeys.RPC_ADDRESS, senderAddress)}")
true
}
}
@@ -447,7 +449,7 @@ class ReceiverTracker(ssc: StreamingContext,
skipReceiverLaunch: Boolean = false
runDummySparkJob()
- logInfo("Starting " + receivers.length + " receivers")
+ logInfo(log"Starting ${MDC(LogKeys.NUM_RECEIVERS, receivers.length)}
receivers")
endpoint.send(StartAllReceivers(receivers.toImmutableArraySeq))
}
@@ -625,7 +627,7 @@ class ReceiverTracker(ssc: StreamingContext,
skipReceiverLaunch: Boolean = false
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
- logInfo(s"Restarting Receiver $receiverId")
+ logInfo(log"Restarting Receiver ${MDC(LogKeys.STREAM_ID,
receiverId)}")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
@@ -633,11 +635,11 @@ class ReceiverTracker(ssc: StreamingContext,
skipReceiverLaunch: Boolean = false
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
- logInfo(s"Restarting Receiver $receiverId")
+ logInfo(log"Restarting Receiver ${MDC(LogKeys.STREAM_ID,
receiverId)}")
self.send(RestartReceiver(receiver))
}
}(ThreadUtils.sameThread)
- logInfo(s"Receiver ${receiver.streamId} started")
+ logInfo(log"Receiver ${MDC(LogKeys.STREAM_ID, receiver.streamId)}
started")
}
override def onStop(): Unit = {
@@ -660,7 +662,8 @@ class ReceiverTracker(ssc: StreamingContext,
skipReceiverLaunch: Boolean = false
/** Send stop signal to the receivers. */
private def stopReceivers(): Unit = {
receiverTrackingInfos.values.flatMap(_.endpoint).foreach {
_.send(StopReceiver) }
- logInfo("Sent stop signal to all " + receiverTrackingInfos.size + "
receivers")
+ logInfo(log"Sent stop signal to all " +
+ log"${MDC(LogKeys.NUM_RECEIVERS, receiverTrackingInfos.size)}
receivers")
}
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
index dc02062b9eb4..1b05a6ac30cc 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.scheduler.rate
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
/**
* Implements a proportional-integral-derivative (PID) controller which acts on
@@ -74,8 +74,11 @@ private[streaming] class PIDRateEstimator(
minRate > 0,
s"Minimum rate in PIDRateEstimator should be > 0")
- logInfo(s"Created PIDRateEstimator with proportional = $proportional,
integral = $integral, " +
- s"derivative = $derivative, min rate = $minRate")
+ logInfo(log"Created PIDRateEstimator with proportional = " +
+ log"${MDC(LogKeys.PROPORTIONAL, proportional)}, integral = " +
+ log"${MDC(LogKeys.INTEGRAL, integral)}, derivative = " +
+ log"${MDC(LogKeys.DERIVATIVE, derivative)}, min rate = " +
+ log"${MDC(LogKeys.MIN_RATE, minRate)}")
def compute(
time: Long, // in milliseconds
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
index 5dcdd573c744..8befe53efffa 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
@@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
import org.apache.spark.SparkConf
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.RECORDS
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -122,7 +122,8 @@ private[util] class BatchedWriteAheadLog(val wrappedLog:
WriteAheadLog, conf: Sp
* Stop the batched writer thread, fulfill promises with failures and close
the wrapped WAL.
*/
override def close(): Unit = {
- logInfo(s"BatchedWriteAheadLog shutting down at time:
${System.currentTimeMillis()}.")
+ logInfo(log"BatchedWriteAheadLog shutting down at time: " +
+ log"${MDC(LogKeys.TIME, System.currentTimeMillis())}.")
if (!active.getAndSet(false)) return
batchedWriterThread.interrupt()
batchedWriterThread.join()
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 58a6b929a81f..d90095c73785 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{NUM_RETRY, WRITE_AHEAD_LOG_INFO}
import org.apache.spark.util.{CompletionIterator, ThreadUtils}
import org.apache.spark.util.ArrayImplicits._
@@ -137,7 +137,8 @@ private[streaming] class FileBasedWriteAheadLog(
*/
def readAll(): JIterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
- logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n"))
+ logInfo(log"Reading from the logs:\n" +
+ log"${MDC(LogKeys.PATHS, logFilesToRead.mkString("\n"))}")
def readFile(file: String): Iterator[ByteBuffer] = {
logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
@@ -170,8 +171,11 @@ private[streaming] class FileBasedWriteAheadLog(
pastLogs --= expiredLogs
expiredLogs
}
- logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in
$logDirectory " +
- s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
+ logInfo(log"Attempting to clear ${MDC(LogKeys.NUM_RECORDS_READ,
oldLogFiles.size)} " +
+ log"old log files in " +
+ log"${MDC(LogKeys.PATH, logDirectory)} older than " +
+ log"${MDC(LogKeys.THRESHOLD, threshTime)}: " +
+ log"${MDC(LogKeys.FILES, oldLogFiles.map(_.path).mkString("\n"))}")
def deleteFile(walInfo: LogInfo): Unit = {
try {
@@ -184,7 +188,8 @@ private[streaming] class FileBasedWriteAheadLog(
logWarning(log"Error clearing write ahead log file " +
log"${MDC(WRITE_AHEAD_LOG_INFO, walInfo)}", ex)
}
- logInfo(s"Cleared log files in $logDirectory older than $threshTime")
+ logInfo(log"Cleared log files in ${MDC(LogKeys.PATH, logDirectory)}
older than " +
+ log"${MDC(LogKeys.THRESH_TIME, threshTime)}")
}
oldLogFiles.foreach { logInfo =>
if (!executionContext.isShutdown) {
@@ -252,7 +257,9 @@ private[streaming] class FileBasedWriteAheadLog(
fileSystem.listStatus(logDirectoryPath).map { _.getPath
}.toImmutableArraySeq)
pastLogs.clear()
pastLogs ++= logFileInfo
- logInfo(s"Recovered ${logFileInfo.size} write ahead log files from
$logDirectory")
+ logInfo(log"Recovered ${MDC(LogKeys.NUM_FILES, logFileInfo.size)} " +
+ log"write ahead log files from " +
+ log"${MDC(LogKeys.PATH, logDirectory)}")
logDebug(s"Recovered files
are:\n${logFileInfo.map(_.path).mkString("\n")}")
}
} catch {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]