This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 520f3b1c192b [SPARK-47593][CORE] Connector module: Migrate logWarn
with variables to structured logging framework
520f3b1c192b is described below
commit 520f3b1c192b1bae53509fdad770f5711ca3791f
Author: panbingkun <[email protected]>
AuthorDate: Tue Apr 9 21:42:39 2024 -0700
[SPARK-47593][CORE] Connector module: Migrate logWarn with variables to
structured logging framework
### What changes were proposed in this pull request?
The pr aims to migrate `logWarning` in module `Connector` with variables to
`structured logging framework`.
### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Pass GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45879 from panbingkun/SPARK-47593_warning.
Lead-authored-by: panbingkun <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../scala/org/apache/spark/internal/LogKey.scala | 23 +++++++++++++
.../scala/org/apache/spark/util/MDCSuite.scala | 15 +++++++-
.../org/apache/spark/sql/avro/AvroUtils.scala | 9 ++---
.../ExecutePlanResponseReattachableIterator.scala | 2 +-
.../sql/connect/client/GrpcRetryHandler.scala | 18 +++++-----
.../execution/ExecuteGrpcResponseSender.scala | 15 +++++---
.../service/SparkConnectStreamingQueryCache.scala | 14 +++++---
.../connect/ui/SparkConnectServerListener.scala | 36 +++++++++++++------
.../sql/jdbc/DockerJDBCIntegrationSuite.scala | 7 ++--
.../spark/sql/kafka010/KafkaContinuousStream.scala | 5 +--
.../spark/sql/kafka010/KafkaMicroBatchStream.scala | 5 +--
.../sql/kafka010/KafkaOffsetReaderAdmin.scala | 10 +++---
.../sql/kafka010/KafkaOffsetReaderConsumer.scala | 10 +++---
.../apache/spark/sql/kafka010/KafkaSource.scala | 5 +--
.../sql/kafka010/consumer/FetchedDataPool.scala | 7 ++--
.../sql/kafka010/consumer/KafkaDataConsumer.scala | 40 +++++++++++++---------
.../producer/InternalKafkaProducerPool.scala | 6 ++--
.../apache/spark/sql/kafka010/KafkaTestUtils.scala | 9 ++---
.../kafka010/KafkaDelegationTokenProvider.scala | 10 +++---
.../streaming/kafka010/ConsumerStrategy.scala | 11 +++---
.../streaming/kafka010/KafkaDataConsumer.scala | 7 ++--
.../spark/streaming/kafka010/KafkaUtils.scala | 16 +++++----
.../streaming/kinesis/KinesisBackedBlockRDD.scala | 6 ++--
.../streaming/kinesis/KinesisRecordProcessor.scala | 2 +-
.../spark/streaming/kinesis/KinesisTestUtils.scala | 7 ++--
25 files changed, 195 insertions(+), 100 deletions(-)
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 2cb5eac4548c..6cdec011e2ae 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -34,6 +34,7 @@ object LogKey extends Enumeration {
val CATEGORICAL_FEATURES = Value
val CLASS_LOADER = Value
val CLASS_NAME = Value
+ val CLUSTER_ID = Value
val COLUMN_DATA_TYPE_SOURCE = Value
val COLUMN_DATA_TYPE_TARGET = Value
val COLUMN_DEFAULT_VALUE = Value
@@ -43,6 +44,7 @@ object LogKey extends Enumeration {
val COMPONENT = Value
val CONFIG = Value
val CONFIG2 = Value
+ val CONTAINER = Value
val CONTAINER_ID = Value
val COUNT = Value
val CSV_HEADER_COLUMN_NAME = Value
@@ -51,6 +53,7 @@ object LogKey extends Enumeration {
val CSV_SCHEMA_FIELD_NAME = Value
val CSV_SCHEMA_FIELD_NAMES = Value
val CSV_SOURCE = Value
+ val DATA = Value
val DATABASE_NAME = Value
val DRIVER_ID = Value
val DROPPED_PARTITIONS = Value
@@ -70,9 +73,11 @@ object LogKey extends Enumeration {
val HIVE_OPERATION_STATE = Value
val HIVE_OPERATION_TYPE = Value
val HOST = Value
+ val INDEX = Value
val JOB_ID = Value
val JOIN_CONDITION = Value
val JOIN_CONDITION_SUB_EXPRESSION = Value
+ val KEY = Value
val LEARNING_RATE = Value
val LINE = Value
val LINE_NUM = Value
@@ -80,17 +85,23 @@ object LogKey extends Enumeration {
val LOG_TYPE = Value
val MASTER_URL = Value
val MAX_ATTEMPTS = Value
+ val MAX_CAPACITY = Value
val MAX_CATEGORIES = Value
val MAX_EXECUTOR_FAILURES = Value
val MAX_SIZE = Value
val MERGE_DIR_NAME = Value
val METHOD_NAME = Value
val MIN_SIZE = Value
+ val NEW_VALUE = Value
val NUM_COLUMNS = Value
val NUM_ITERATIONS = Value
val OBJECT_ID = Value
+ val OFFSET = Value
+ val OFFSETS = Value
val OLD_BLOCK_MANAGER_ID = Value
+ val OLD_VALUE = Value
val OPTIMIZER_CLASS_NAME = Value
+ val OP_ID = Value
val OP_TYPE = Value
val PARSE_MODE = Value
val PARTITION_ID = Value
@@ -99,8 +110,11 @@ object LogKey extends Enumeration {
val PATH = Value
val PATHS = Value
val POD_ID = Value
+ val POLICY = Value
val PORT = Value
+ val PRODUCER_ID = Value
val QUERY_HINT = Value
+ val QUERY_ID = Value
val QUERY_PLAN = Value
val QUERY_PLAN_LENGTH_ACTUAL = Value
val QUERY_PLAN_LENGTH_MAX = Value
@@ -117,6 +131,7 @@ object LogKey extends Enumeration {
val RULE_BATCH_NAME = Value
val RULE_NAME = Value
val RULE_NUMBER_OF_RUNS = Value
+ val SERVICE_NAME = Value
val SESSION_ID = Value
val SHARD_ID = Value
val SHUFFLE_BLOCK_INFO = Value
@@ -127,6 +142,8 @@ object LogKey extends Enumeration {
val SQL_TEXT = Value
val STAGE_ID = Value
val STATEMENT_ID = Value
+ val STATUS = Value
+ val STREAM_NAME = Value
val SUBMISSION_ID = Value
val SUBSAMPLING_RATE = Value
val TABLE_NAME = Value
@@ -140,13 +157,19 @@ object LogKey extends Enumeration {
val TID = Value
val TIMEOUT = Value
val TIME_UNITS = Value
+ val TIP = Value
+ val TOPIC_PARTITION = Value
val TOTAL_EFFECTIVE_TIME = Value
val TOTAL_TIME = Value
val UNSUPPORTED_EXPRESSION = Value
val UNSUPPORTED_HINT_REASON = Value
+ val UNTIL_OFFSET = Value
val URI = Value
val USER_ID = Value
val USER_NAME = Value
+ val WAIT_RESULT_TIME = Value
+ val WAIT_SEND_TIME = Value
+ val WAIT_TIME = Value
val WATERMARK_CONSTRAINT = Value
val WORKER_URL = Value
val XSD_PATH = Value
diff --git a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala
b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala
index 1ac51e236080..62eaa852913c 100644
--- a/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala
+++ b/common/utils/src/test/scala/org/apache/spark/util/MDCSuite.scala
@@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._
import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.EXIT_CODE
+import org.apache.spark.internal.LogKey.{EXIT_CODE, OFFSET, RANGE}
class MDCSuite
extends AnyFunSuite // scalastyle:ignore funsuite
@@ -41,6 +41,19 @@ class MDCSuite
assert(log.context === Map("exit_code" -> "CustomObjectValue: spark,
10086").asJava)
}
+ test("check MDC stripMargin") {
+ val log =
+ log"""
+ |The current available offset range is ${MDC(RANGE, "12 - 34")}.
+ | Offset ${MDC(OFFSET, "666")}. is out of range""".stripMargin
+ val expected =
+ s"""
+ |The current available offset range is 12 - 34.
+ | Offset 666. is out of range""".stripMargin
+ assert(log.message === expected)
+ assert(log.context === Map("range" -> "12 - 34", "offset" -> "666").asJava)
+ }
+
case class CustomObjectValue(key: String, value: Int) {
override def toString: String = {
"CustomObjectValue: " + key + ", " + value
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index 269a8b80c2b7..e8be11f48a2b 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -31,7 +31,8 @@ import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CONFIG, PATH}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroCompressionCodec._
import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION
@@ -51,8 +52,8 @@ private[sql] object AvroUtils extends Logging {
val parsedOptions = new AvroOptions(options, conf)
if (parsedOptions.parameters.contains(IGNORE_EXTENSION)) {
- logWarning(s"Option $IGNORE_EXTENSION is deprecated. Please use the " +
- "general data source option pathGlobFilter for filtering file names.")
+ logWarning(log"Option ${MDC(CONFIG, IGNORE_EXTENSION)} is deprecated.
Please use the " +
+ log"general data source option pathGlobFilter for filtering file
names.")
}
// User can specify an optional avro json schema.
val avroSchema = parsedOptions.schema
@@ -160,7 +161,7 @@ private[sql] object AvroUtils extends Logging {
} catch {
case e: IOException =>
if (ignoreCorruptFiles) {
- logWarning(s"Skipped the footer in the corrupted file: $path",
e)
+ logWarning(log"Skipped the footer in the corrupted file:
${MDC(PATH, path)}", e)
None
} else {
throw new SparkException(s"Could not read file: $path", e)
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 5854a9225dbe..74f13272a365 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -277,7 +277,7 @@ class ExecutePlanResponseReattachableIterator(
}
} catch {
case NonFatal(e) =>
- logWarning(s"ReleaseExecute failed with exception: $e.")
+ logWarning(log"ReleaseExecute failed with exception:", e)
}
}
}
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
index 2418dfa03505..508dad3d748d 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
@@ -22,6 +22,8 @@ import scala.util.control.NonFatal
import io.grpc.stub.StreamObserver
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKey.{ERROR, POLICY, RETRY_COUNT, WAIT_TIME}
+import org.apache.spark.internal.MDC
private[sql] class GrpcRetryHandler(
private val policies: Seq[RetryPolicy],
@@ -187,8 +189,8 @@ private[sql] object GrpcRetryHandler extends Logging {
if (lastException.isInstanceOf[RetryException]) {
// retry exception is considered immediately retriable without any
policies.
logWarning(
- s"Non-Fatal error during RPC execution: $lastException, retrying " +
- s"(currentRetryNum=$currentRetryNum)")
+ log"Non-Fatal error during RPC execution: ${MDC(ERROR,
lastException)}, " +
+ log"retrying (currentRetryNum=${MDC(RETRY_COUNT,
currentRetryNum)})")
return
}
@@ -197,18 +199,18 @@ private[sql] object GrpcRetryHandler extends Logging {
if (time.isDefined) {
logWarning(
- s"Non-Fatal error during RPC execution: $lastException, retrying "
+
- s"(wait=${time.get.toMillis}, currentRetryNum=$currentRetryNum,
" +
- s"policy: ${policy.getName})")
-
+ log"Non-Fatal error during RPC execution: ${MDC(ERROR,
lastException)}, " +
+ log"retrying (wait=${MDC(WAIT_TIME, time.get.toMillis)} ms, " +
+ log"currentRetryNum=${MDC(RETRY_COUNT, currentRetryNum)}, " +
+ log"policy=${MDC(POLICY, policy.getName)}).")
sleep(time.get.toMillis)
return
}
}
logWarning(
- s"Non-Fatal error during RPC execution: $lastException, exceeded
retries " +
- s"(currentRetryNum=$currentRetryNum)")
+ log"Non-Fatal error during RPC execution: ${MDC(ERROR,
lastException)}, " +
+ log"exceeded retries (currentRetryNum=${MDC(RETRY_COUNT,
currentRetryNum)})")
val error = new RetriesExceeded()
exceptionList.foreach(error.addSuppressed)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 4b95f38c6695..1139507a37a5 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -24,7 +24,9 @@ import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto.ExecutePlanResponse
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{INDEX, OP_ID, TOTAL_TIME,
WAIT_RESULT_TIME, WAIT_SEND_TIME}
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.connect.common.ProtoUtils
import
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION,
CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE,
CONNECT_PROGRESS_REPORT_INTERVAL}
import org.apache.spark.sql.connect.service.{ExecuteHolder,
SparkConnectService}
@@ -268,10 +270,15 @@ private[connect] class ExecuteGrpcResponseSender[T <:
Message](
// Process the outcome of the inner loop.
if (interrupted) {
// This sender got interrupted. Kill this RPC.
+ val totalTime = (System.nanoTime - startTime) /
NANOS_PER_MILLIS.toDouble
+ val waitResultTime = consumeSleep / NANOS_PER_MILLIS.toDouble
+ val waitSendTime = sendSleep / NANOS_PER_MILLIS.toDouble
logWarning(
- s"Got detached from opId=${executeHolder.operationId} at index
${nextIndex - 1}." +
- s"totalTime=${System.nanoTime - startTime}ns " +
- s"waitingForResults=${consumeSleep}ns
waitingForSend=${sendSleep}ns")
+ log"Got detached from opId=${MDC(OP_ID, executeHolder.operationId)}
" +
+ log"at index ${MDC(INDEX, nextIndex - 1)}." +
+ log"totalTime=${MDC(TOTAL_TIME, totalTime)} ms " +
+ log"waitingForResults=${MDC(WAIT_RESULT_TIME, waitResultTime)} ms
" +
+ log"waitingForSend=${MDC(WAIT_SEND_TIME, waitSendTime)} ms")
throw new SparkSQLException(errorClass =
"INVALID_CURSOR.DISCONNECTED", Map.empty)
} else if (gotResponse) {
enqueueProgressMessage()
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
index a5d3fa497bb3..9690d10eba1a 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
@@ -26,7 +26,8 @@ import scala.collection.mutable
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
import scala.util.control.NonFatal
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{NEW_VALUE, OLD_VALUE, QUERY_ID}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
@@ -65,9 +66,9 @@ private[connect] class SparkConnectStreamingQueryCache(
queryCache.put(QueryCacheKey(query.id.toString, query.runId.toString),
value) match {
case Some(existing) => // Query is being replace. Not really expected.
- logWarning(
- s"Replacing existing query in the cache (unexpected). Query Id:
${query.id}." +
- s"Existing value $existing, new value $value.")
+ logWarning(log"Replacing existing query in the cache (unexpected). "
+
+ log"Query Id: ${MDC(QUERY_ID, query.id)}.Existing value
${MDC(OLD_VALUE, existing)}, " +
+ log"new value ${MDC(NEW_VALUE, value)}.")
case None =>
logInfo(s"Adding new query to the cache. Query Id ${query.id}, value
$value.")
}
@@ -115,7 +116,10 @@ private[connect] class SparkConnectStreamingQueryCache(
v.query.stop()
} catch {
case NonFatal(ex) =>
- logWarning(s"Failed to stop the query ${k.queryId}. Error is
ignored.", ex)
+ logWarning(
+ log"Failed to stop the query ${MDC(QUERY_ID, k.queryId)}. " +
+ log"Error is ignored.",
+ ex)
}
}
}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
index ec6079a89145..a1bbab7dbdbc 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListener.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{OP_ID, SESSION_ID}
import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
import org.apache.spark.scheduler._
import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT,
CONNECT_UI_STATEMENT_LIMIT}
@@ -183,8 +184,8 @@ private[connect] class SparkConnectServerListener(
updateLiveStore(sessionData)
case None =>
logWarning(
- s"onOperationStart called with unknown session id: ${e.sessionId}." +
- s"Regardless, the operation has been registered.")
+ log"onOperationStart called with unknown session id:
${MDC(SESSION_ID, e.sessionId)}." +
+ log"Regardless, the operation has been registered.")
}
}
@@ -194,7 +195,9 @@ private[connect] class SparkConnectServerListener(
executionData.state = ExecutionState.COMPILED
updateLiveStore(executionData)
case None =>
- logWarning(s"onOperationAnalyzed called with unknown operation id:
${e.jobTag}")
+ logWarning(
+ log"onOperationAnalyzed called with " +
+ log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
}
}
@@ -205,7 +208,9 @@ private[connect] class SparkConnectServerListener(
executionData.state = ExecutionState.READY
updateLiveStore(executionData)
case None =>
- logWarning(s"onOperationReadyForExecution called with unknown
operation id: ${e.jobTag}")
+ logWarning(
+ log"onOperationReadyForExecution called with " +
+ log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
}
}
@@ -216,7 +221,9 @@ private[connect] class SparkConnectServerListener(
executionData.state = ExecutionState.CANCELED
updateLiveStore(executionData)
case None =>
- logWarning(s"onOperationCanceled called with unknown operation id:
${e.jobTag}")
+ logWarning(
+ log"onOperationCanceled called with " +
+ log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
}
}
private def onOperationFailed(e: SparkListenerConnectOperationFailed) =
synchronized {
@@ -227,7 +234,9 @@ private[connect] class SparkConnectServerListener(
executionData.state = ExecutionState.FAILED
updateLiveStore(executionData)
case None =>
- logWarning(s"onOperationFailed called with unknown operation id:
${e.jobTag}")
+ logWarning(
+ log"onOperationFailed called with " +
+ log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
}
}
private def onOperationFinished(e: SparkListenerConnectOperationFinished) =
synchronized {
@@ -237,7 +246,9 @@ private[connect] class SparkConnectServerListener(
executionData.state = ExecutionState.FINISHED
updateLiveStore(executionData)
case None =>
- logWarning(s"onOperationFinished called with unknown operation id:
${e.jobTag}")
+ logWarning(
+ log"onOperationFinished called with " +
+ log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
}
}
private def onOperationClosed(e: SparkListenerConnectOperationClosed) =
synchronized {
@@ -248,7 +259,9 @@ private[connect] class SparkConnectServerListener(
updateStoreWithTriggerEnabled(executionData)
executionList.remove(e.jobTag)
case None =>
- logWarning(s"onOperationClosed called with unknown operation id:
${e.jobTag}")
+ logWarning(
+ log"onOperationClosed called with " +
+ log"unknown operation id: ${MDC(OP_ID, e.jobTag)}")
}
}
@@ -265,7 +278,10 @@ private[connect] class SparkConnectServerListener(
updateStoreWithTriggerEnabled(sessionData)
sessionList.remove(e.sessionId)
- case None => logWarning(s"onSessionClosed called with unknown session
id: ${e.sessionId}")
+ case None =>
+ logWarning(
+ log"onSessionClosed called with " +
+ log"unknown session id: ${MDC(SESSION_ID, e.sessionId)}")
}
}
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
index 8e65e8af9d7a..fc095c5f5b31 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -36,7 +36,7 @@ import com.github.dockerjava.zerodep.ZerodepDockerHttpClient
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.SpanSugar._
-import org.apache.spark.internal.LogKey.CLASS_NAME
+import org.apache.spark.internal.LogKey.{CLASS_NAME, CONTAINER, STATUS}
import org.apache.spark.internal.MDC
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
@@ -263,9 +263,10 @@ abstract class DockerJDBCIntegrationSuite
} catch {
case NonFatal(e) =>
val response = docker.inspectContainerCmd(container.getId).exec()
- logWarning(s"Container $container already stopped")
+ logWarning(log"Container ${MDC(CONTAINER, container)} already
stopped")
val status =
Option(response).map(_.getState.getStatus).getOrElse("unknown")
- logWarning(s"Could not stop container $container at stage
'$status'", e)
+ logWarning(log"Could not stop container ${MDC(CONTAINER, container)}
" +
+ log"at stage '${MDC(STATUS, status)}'", e)
} finally {
logContainerOutput()
docker.removeContainerCmd(container.getId).exec()
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index 9b7f52585545..9bf0a2e9e513 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -24,7 +24,8 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig,
ConsumerRecord, Offset
import org.apache.kafka.common.TopicPartition
import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, TIP}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.read.InputPartition
@@ -148,7 +149,7 @@ class KafkaContinuousStream(
if (failOnDataLoss) {
throw getException()
} else {
- logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ logWarning(log"${MDC(ERROR, message)}. ${MDC(TIP,
INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}")
}
}
}
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index fefa3efcc353..be838ddc3c80 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, TIP}
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
@@ -309,7 +310,7 @@ private[kafka010] class KafkaMicroBatchStream(
if (failOnDataLoss) {
throw getException()
} else {
- logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ logWarning(log"${MDC(ERROR, message)}. ${MDC(TIP,
INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}")
}
}
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
index 27adccf6f902..433da08176e7 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
@@ -30,7 +30,8 @@ import org.apache.kafka.common.{IsolationLevel,
TopicPartition}
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT}
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import
org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
@@ -335,8 +336,8 @@ private[kafka010] class KafkaOffsetReaderAdmin(
incorrectOffsets = findIncorrectOffsets()
if (incorrectOffsets.nonEmpty) {
- logWarning("Found incorrect offsets in some partitions " +
- s"(partition, previous offset, fetched offset):
$incorrectOffsets")
+ logWarning(log"Found incorrect offsets in some partitions " +
+ log"(partition, previous offset, fetched offset): ${MDC(OFFSETS,
incorrectOffsets)}")
if (attempt < maxOffsetFetchAttempts) {
logWarning("Retrying to fetch latest offsets because of
incorrect offsets")
Thread.sleep(offsetFetchAttemptIntervalMs)
@@ -534,7 +535,8 @@ private[kafka010] class KafkaOffsetReaderAdmin(
} catch {
case NonFatal(e) =>
lastException = e
- logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
+ logWarning(
+ log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting Kafka
offsets: ", e)
attempt += 1
Thread.sleep(offsetFetchAttemptIntervalMs)
resetAdmin()
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
index d4953a4a65e3..2ba4a9a563df 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
@@ -27,7 +27,8 @@ import org.apache.kafka.clients.consumer.{Consumer,
ConsumerConfig, OffsetAndTim
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT}
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import
org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
@@ -385,8 +386,8 @@ private[kafka010] class KafkaOffsetReaderConsumer(
incorrectOffsets = findIncorrectOffsets()
if (incorrectOffsets.nonEmpty) {
- logWarning("Found incorrect offsets in some partitions " +
- s"(partition, previous offset, fetched offset):
$incorrectOffsets")
+ logWarning(log"Found incorrect offsets in some partitions " +
+ log"(partition, previous offset, fetched offset): ${MDC(OFFSETS,
incorrectOffsets)}")
if (attempt < maxOffsetFetchAttempts) {
logWarning("Retrying to fetch latest offsets because of
incorrect offsets")
Thread.sleep(offsetFetchAttemptIntervalMs)
@@ -611,7 +612,8 @@ private[kafka010] class KafkaOffsetReaderConsumer(
} catch {
case NonFatal(e) =>
lastException = e
- logWarning(s"Error in attempt $attempt getting Kafka
offsets: ", e)
+ logWarning(
+ log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting
Kafka offsets: ", e)
attempt += 1
Thread.sleep(offsetFetchAttemptIntervalMs)
resetConsumer()
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 83ed7fff23fc..426672d2e458 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -22,7 +22,8 @@ import java.{util => ju}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, TIP}
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql._
@@ -351,7 +352,7 @@ private[kafka010] class KafkaSource(
if (failOnDataLoss) {
throw getException()
} else {
- logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+ logWarning(log"${MDC(ERROR, message)}. ${MDC(TIP,
INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}")
}
}
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
index 3e6831770a67..981aa71bf947 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
@@ -26,7 +26,8 @@ import scala.collection.mutable
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{DATA, KEY}
import
org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL,
FETCHED_DATA_CACHE_TIMEOUT}
import
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange,
CacheKey, UNKNOWN_OFFSET}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -110,8 +111,8 @@ private[consumer] class FetchedDataPool(
def release(key: CacheKey, fetchedData: FetchedData): Unit = synchronized {
def warnReleasedDataNotInPool(key: CacheKey, fetchedData: FetchedData):
Unit = {
- logWarning(s"No matching data in pool for $fetchedData in key $key. " +
- "It might be released before, or it was not a part of pool.")
+ logWarning(log"No matching data in pool for ${MDC(DATA, fetchedData)} in
key " +
+ log"${MDC(KEY, key)}. It might be released before, or it was not a
part of pool.")
}
cache.get(key) match {
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index fbc4a500322e..3ea7d967744c 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -30,7 +30,8 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
+import org.apache.spark.internal.LogKey.{ERROR, GROUP_ID, OFFSET, RANGE, TIP,
TOPIC_PARTITION, UNTIL_OFFSET}
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil}
import org.apache.spark.sql.kafka010.KafkaExceptions
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
@@ -426,7 +427,8 @@ private[kafka010] class KafkaDataConsumer(
val range = timeNanos {
consumer.getAvailableOffsetRange()
}
- logWarning(s"Some data may be lost. Recovering from the earliest offset:
${range.earliest}")
+ logWarning(log"Some data may be lost. Recovering from the earliest offset:
" +
+ log"${MDC(OFFSET, range.earliest)}")
val topicPartition = consumer.topicPartition
val groupId = consumer.groupId
@@ -444,11 +446,12 @@ private[kafka010] class KafkaDataConsumer(
// | | | |
// offset untilOffset earliestOffset latestOffset
val warningMessage =
- s"""
- |The current available offset range is $range.
- | Offset $offset is out of range, and records in [$offset,
$untilOffset) will be
- | skipped ${additionalWarningMessage(topicPartition, groupId)}
- """.stripMargin
+ log"""
+ |The current available offset range is ${MDC(RANGE, range)}.
+ | Offset ${MDC(OFFSET, offset)} is out of range, and records in
+ | [${MDC(OFFSET, offset)}, ${MDC(UNTIL_OFFSET, untilOffset)}] will be
+ | skipped""".stripMargin +
+ additionalWarningMessage(topicPartition, groupId)
logWarning(warningMessage)
UNKNOWN_OFFSET
} else if (offset >= range.earliest) {
@@ -460,8 +463,8 @@ private[kafka010] class KafkaDataConsumer(
// This will happen when a topic is deleted and recreated, and new data
are pushed very fast,
// then we will see `offset` disappears first then appears again.
Although the parameters
// are same, the state in Kafka cluster is changed, so the outer loop
won't be endless.
- logWarning(s"Found a disappeared offset $offset. Some data may be lost "
+
- s"${additionalWarningMessage(topicPartition, groupId)}")
+ logWarning(log"Found a disappeared offset ${MDC(OFFSET, offset)}. Some
data may be lost " +
+ additionalWarningMessage(topicPartition, groupId))
offset
} else {
//
------------------------------------------------------------------------------
@@ -470,10 +473,11 @@ private[kafka010] class KafkaDataConsumer(
// offset earliestOffset min(untilOffset,latestOffset)
max(untilOffset, latestOffset)
val warningMessage =
s"""
- |The current available offset range is $range.
- | Offset ${offset} is out of range, and records in [$offset,
${range.earliest}) will be
- | skipped ${additionalWarningMessage(topicPartition, groupId)}
- """.stripMargin
+ |The current available offset range is ${MDC(RANGE, range)}.
+ | Offset ${MDC(OFFSET, offset)} is out of range, and records in
+ | [${MDC(OFFSET, offset)}, ${MDC(UNTIL_OFFSET, range.earliest)}] will
be
+ | skipped""".stripMargin +
+ additionalWarningMessage(topicPartition, groupId)
logWarning(warningMessage)
range.earliest
}
@@ -629,9 +633,10 @@ private[kafka010] class KafkaDataConsumer(
*/
private def additionalWarningMessage(
topicPartition: TopicPartition,
- groupId: String): String = {
- s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
- s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE"
+ groupId: String): MessageWithContext = {
+ log"(GroupId: ${MDC(GROUP_ID, groupId)}, " +
+ log"TopicPartition: ${MDC(TOPIC_PARTITION, topicPartition)}). " +
+ log"${MDC(TIP, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE)}"
}
/**
@@ -660,7 +665,8 @@ private[kafka010] class KafkaDataConsumer(
groupId: String,
message: String,
cause: Throwable = null): Unit = {
- val finalMessage = s"$message ${additionalWarningMessage(topicPartition,
groupId)}"
+ val finalMessage = log"${MDC(ERROR, message)}" +
+ additionalWarningMessage(topicPartition, groupId)
dataLoss += 1
if (cause != null) {
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala
index ddde7805f1a4..f35023d744b6 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala
@@ -27,7 +27,8 @@ import scala.jdk.CollectionConverters._
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PRODUCER_ID
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}
import
org.apache.spark.sql.kafka010.{PRODUCER_CACHE_EVICTOR_THREAD_RUN_INTERVAL,
PRODUCER_CACHE_TIMEOUT}
import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock,
ThreadUtils, Utils}
@@ -96,7 +97,8 @@ private[producer] class InternalKafkaProducerPool(
case Some(entry) if entry.producer.id == producer.id =>
entry.handleReturned(clock.nanoTime())
case _ =>
- logWarning(s"Released producer ${producer.id} is not a member of the
cache. Closing.")
+ logWarning(log"Released producer ${MDC(PRODUCER_ID, producer.id)} is
not " +
+ log"a member of the cache. Closing.")
producer.close()
}
}
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 1c397a8d5005..8ec8f2556b9b 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -51,7 +51,8 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.ERROR
import org.apache.spark.kafka010.KafkaTokenUtil
import org.apache.spark.util.{SecurityUtils, ShutdownHookManager, Utils}
import org.apache.spark.util.ArrayImplicits._
@@ -332,7 +333,7 @@ class KafkaTestUtils(
Utils.deleteRecursively(new File(f))
} catch {
case e: IOException if Utils.isWindows =>
- logWarning(e.getMessage)
+ logWarning(log"${MDC(ERROR, e.getMessage)}")
}
}
@@ -653,13 +654,13 @@ class KafkaTestUtils(
Utils.deleteRecursively(snapshotDir)
} catch {
case e: IOException if Utils.isWindows =>
- logWarning(e.getMessage)
+ logWarning(log"${MDC(ERROR, e.getMessage)}")
}
try {
Utils.deleteRecursively(logDir)
} catch {
case e: IOException if Utils.isWindows =>
- logWarning(e.getMessage)
+ logWarning(log"${MDC(ERROR, e.getMessage)}")
}
System.clearProperty(ZOOKEEPER_AUTH_PROVIDER)
}
diff --git
a/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
b/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
index d1d2a031e662..d0bcf90babc1 100644
---
a/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
+++
b/connector/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
@@ -24,7 +24,8 @@ import org.apache.hadoop.security.Credentials
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT,
SASL_SSL, SSL}
import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CLUSTER_ID, SERVICE_NAME}
import org.apache.spark.security.HadoopDelegationTokenProvider
private[spark] class KafkaDelegationTokenProvider
@@ -54,9 +55,10 @@ private[spark] class KafkaDelegationTokenProvider
}
} catch {
case NonFatal(e) =>
- logWarning(s"Failed to get token from service: $serviceName due to
$e on " +
- s"cluster: ${clusterConf.identifier}. If $serviceName is not
used, " +
- s"set spark.security.credentials.$serviceName.enabled to false")
+ logWarning(log"Failed to get token from service:
${MDC(SERVICE_NAME, serviceName)} " +
+ log"on cluster: ${MDC(CLUSTER_ID, clusterConf.identifier)}. If "
+
+ log"${MDC(SERVICE_NAME, serviceName)} is not used, please set " +
+ log"spark.security.credentials.${MDC(SERVICE_NAME,
serviceName)}.enabled to false", e)
}
}
} catch {
diff --git
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 693ddd31d9a8..2bc2acf9aaf9 100644
---
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CONFIG
import org.apache.spark.kafka010.KafkaConfigUpdater
/**
@@ -107,8 +108,8 @@ private case class Subscribe[K, V](
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
- logWarning("Catching NoOffsetForPartitionException since " +
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See
KAFKA-3370")
+ logWarning(log"Catching NoOffsetForPartitionException since " +
+ log"${MDC(CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)} is
none. See KAFKA-3370")
}
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
@@ -161,8 +162,8 @@ private case class SubscribePattern[K, V](
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
- logWarning("Catching NoOffsetForPartitionException since " +
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See
KAFKA-3370")
+ logWarning(log"Catching NoOffsetForPartitionException since " +
+ log"${MDC(CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)} is
none. See KAFKA-3370")
}
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
diff --git
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
index c7ac6a8bf744..6b47e9d72f4b 100644
---
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
+++
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
@@ -26,7 +26,8 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig,
ConsumerRecord, KafkaC
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{KEY, MAX_CAPACITY}
import org.apache.spark.kafka010.KafkaConfigUpdater
private[kafka010] sealed trait KafkaDataConsumer[K, V] {
@@ -256,8 +257,8 @@ private[kafka010] object KafkaDataConsumer extends Logging {
if (entry.getValue.inUse == false && this.size > maxCapacity) {
logWarning(
- s"KafkaConsumer cache hitting max capacity of $maxCapacity, " +
- s"removing consumer for ${entry.getKey}")
+ log"KafkaConsumer cache hitting max capacity of
${MDC(MAX_CAPACITY, maxCapacity)}, " +
+ log"removing consumer for ${MDC(KEY, entry.getKey)}")
try {
entry.getValue.close()
} catch {
diff --git
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
index 731d06fd95fa..f3e4c45b3aa9 100644
---
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
+++
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkContext
import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKey.GROUP_ID
+import org.apache.spark.internal.LogKey.{CONFIG, GROUP_ID}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{ JavaInputDStream,
JavaStreamingContext }
@@ -184,26 +184,30 @@ object KafkaUtils extends Logging {
* Tweak kafka params to prevent issues on executors
*/
private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String,
Object]): Unit = {
- logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to
false for executor")
+ logWarning(log"overriding ${MDC(CONFIG,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)} " +
+ log"to false for executor")
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false:
java.lang.Boolean)
- logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none
for executor")
+ logWarning(log"overriding ${MDC(CONFIG,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)} " +
+ log"to none for executor")
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
// driver and executor should be in different consumer groups
val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
if (null == originalGroupId) {
- logError(log"${MDC(GROUP_ID, ConsumerConfig.GROUP_ID_CONFIG)} is null, "
+
+ logError(log"${MDC(CONFIG, ConsumerConfig.GROUP_ID_CONFIG)} is null, " +
log"you should probably set it")
}
val groupId = "spark-executor-" + originalGroupId
- logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to
${groupId}")
+ logWarning(log"overriding executor ${MDC(CONFIG,
ConsumerConfig.GROUP_ID_CONFIG)} " +
+ log"to ${MDC(GROUP_ID, groupId)}")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
// possible workaround for KAFKA-3135
val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
- logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536
see KAFKA-3135")
+ logWarning(log"overriding ${MDC(CONFIG,
ConsumerConfig.RECEIVE_BUFFER_CONFIG)} " +
+ log"to 65536 see KAFKA-3135")
kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536:
java.lang.Integer)
}
}
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index d22496a84b58..6b0c091534b7 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -29,7 +29,8 @@ import
com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
import com.amazonaws.services.kinesis.model._
import org.apache.spark._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{ERROR, RETRY_COUNT}
import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
import org.apache.spark.storage.BlockId
import org.apache.spark.util.NextIterator
@@ -277,7 +278,8 @@ class KinesisSequenceRangeIterator(
lastError = t
t match {
case ptee: ProvisionedThroughputExceededException =>
- logWarning(s"Error while $message [attempt = ${retryCount +
1}]", ptee)
+ logWarning(log"Error while ${MDC(ERROR, message)} " +
+ log"[attempt = ${MDC(RETRY_COUNT, retryCount + 1)}]", ptee)
case e: Throwable =>
throw new SparkException(s"Error while $message", e)
}
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index e1e21954ce88..94e109680fbc 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -120,7 +120,7 @@ private[kinesis] class KinesisRecordProcessor[T](receiver:
KinesisReceiver[T], w
logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason")
// null if not initialized before shutdown:
if (shardId == null) {
- logWarning(s"No shardId for workerId $workerId?")
+ logWarning(log"No shardId for workerId ${MDC(WORKER_URL, workerId)}?")
} else {
reason match {
/*
diff --git
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 406c19be9bff..cd4c61396a12 100644
---
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -33,7 +33,8 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClient}
import com.amazonaws.services.kinesis.model._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{STREAM_NAME, TABLE_NAME}
/**
* Shared utility methods for performing Kinesis tests that actually transfer
data.
@@ -147,7 +148,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
}
} catch {
case e: Exception =>
- logWarning(s"Could not delete stream $streamName")
+ logWarning(log"Could not delete stream ${MDC(STREAM_NAME,
streamName)}", e)
}
}
@@ -158,7 +159,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount:
Int = 2) extends Loggi
table.waitForDelete()
} catch {
case e: Exception =>
- logWarning(s"Could not delete DynamoDB table $tableName")
+ logWarning(log"Could not delete DynamoDB table ${MDC(TABLE_NAME,
tableName)}", e)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]