This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new eaf6b518f67c [SPARK-47587][SQL] Hive module: Migrate logWarn with
variables to structured logging framework
eaf6b518f67c is described below
commit eaf6b518f67c0e3ed04f264c3a89573bd7e74fe7
Author: panbingkun <[email protected]>
AuthorDate: Wed Apr 10 22:34:14 2024 -0700
[SPARK-47587][SQL] Hive module: Migrate logWarn with variables to
structured logging framework
### What changes were proposed in this pull request?
The pr aims to migrate `logWarning` in module `Hive` with variables to
`structured logging framework`.
### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Pass GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45927 from panbingkun/SPARK-47587.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../scala/org/apache/spark/internal/LogKey.scala | 9 +++++++
.../security/HBaseDelegationTokenProvider.scala | 7 ++---
.../main/scala/org/apache/spark/util/Utils.scala | 10 ++++----
.../spark/sql/hive/HiveExternalCatalog.scala | 28 ++++++++++----------
.../spark/sql/hive/HiveMetastoreCatalog.scala | 30 ++++++++++++++--------
.../org/apache/spark/sql/hive/HiveUtils.scala | 5 ++--
.../spark/sql/hive/client/HiveClientImpl.scala | 8 +++---
.../apache/spark/sql/hive/client/HiveShim.scala | 23 +++++++++--------
.../sql/hive/client/IsolatedClientLoader.scala | 13 ++++++----
.../spark/sql/hive/execution/HiveFileFormat.scala | 11 ++++----
.../spark/sql/hive/execution/HiveTempPath.scala | 5 ++--
.../spark/sql/hive/orc/OrcFileOperator.scala | 5 ++--
.../security/HiveDelegationTokenProvider.scala | 8 +++---
13 files changed, 97 insertions(+), 65 deletions(-)
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index a9a79de05c27..28b06f448784 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -44,6 +44,7 @@ object LogKey extends Enumeration {
val COMPONENT = Value
val CONFIG = Value
val CONFIG2 = Value
+ val CONFIG3 = Value
val CONTAINER = Value
val CONTAINER_ID = Value
val COUNT = Value
@@ -58,6 +59,7 @@ object LogKey extends Enumeration {
val DRIVER_ID = Value
val DROPPED_PARTITIONS = Value
val END_POINT = Value
+ val ENGINE = Value
val ERROR = Value
val EVENT_LOOP = Value
val EVENT_QUEUE = Value
@@ -66,14 +68,19 @@ object LogKey extends Enumeration {
val EXIT_CODE = Value
val EXPRESSION_TERMS = Value
val FAILURES = Value
+ val FALLBACK_VERSION = Value
val FIELD_NAME = Value
+ val FILE_FORMAT = Value
+ val FILE_FORMAT2 = Value
val FUNCTION_NAME = Value
val FUNCTION_PARAMETER = Value
val GROUP_ID = Value
+ val HADOOP_VERSION = Value
val HIVE_OPERATION_STATE = Value
val HIVE_OPERATION_TYPE = Value
val HOST = Value
val INDEX = Value
+ val INFERENCE_MODE = Value
val JOB_ID = Value
val JOIN_CONDITION = Value
val JOIN_CONDITION_SUB_EXPRESSION = Value
@@ -132,6 +139,8 @@ object LogKey extends Enumeration {
val RULE_BATCH_NAME = Value
val RULE_NAME = Value
val RULE_NUMBER_OF_RUNS = Value
+ val SCHEMA = Value
+ val SCHEMA2 = Value
val SERVICE_NAME = Value
val SESSION_ID = Value
val SHARD_ID = Value
diff --git
a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
index d60e5975071d..1b2e41bc0a2e 100644
---
a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
@@ -27,7 +27,8 @@ import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.SERVICE_NAME
import org.apache.spark.security.HadoopDelegationTokenProvider
import org.apache.spark.util.Utils
@@ -53,8 +54,8 @@ private[security] class HBaseDelegationTokenProvider
creds.addToken(token.getService, token)
} catch {
case NonFatal(e) =>
- logWarning(Utils.createFailedToGetTokenMessage(serviceName, e) +
- s" Retrying to fetch HBase security token with $serviceName
connection parameter.")
+ logWarning(Utils.createFailedToGetTokenMessage(serviceName, e) + log"
Retrying to fetch " +
+ log"HBase security token with ${MDC(SERVICE_NAME, serviceName)}
connection parameter.")
// Seems to be spark is trying to get the token from HBase 2.x.x
version or above where the
// obtainToken(Configuration conf) API has been removed. Lets try
obtaining the token from
// another compatible API of HBase service.
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7022506e5508..af91a4b32c6f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -68,7 +68,7 @@ import org.slf4j.Logger
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Streaming._
@@ -2972,10 +2972,10 @@ private[spark] object Utils
}
/** Returns a string message about delegation token generation failure */
- def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable):
String = {
- val message = "Failed to get token from service %s due to %s. " +
- "If %s is not used, set spark.security.credentials.%s.enabled to false."
- message.format(serviceName, e, serviceName, serviceName)
+ def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable):
MessageWithContext = {
+ log"Failed to get token from service ${MDC(SERVICE_NAME, serviceName)} " +
+ log"due to ${MDC(ERROR, e)}. If ${MDC(SERVICE_NAME, serviceName)} is not
used, " +
+ log"set spark.security.credentials.${MDC(SERVICE_NAME,
serviceName)}.enabled to false."
}
/**
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 32fc8a452106..8c35e10b383f 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -33,7 +33,8 @@ import
org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT
import org.apache.thrift.TException
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{DATABASE_NAME, SCHEMA, SCHEMA2,
TABLE_NAME}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
@@ -175,9 +176,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient
{
val existingDb = getDatabase(dbDefinition.name)
if (existingDb.properties == dbDefinition.properties) {
- logWarning(s"Request to alter database ${dbDefinition.name} is a no-op
because " +
- s"the provided database properties are the same as the old ones. Hive
does not " +
- s"currently support altering other database fields.")
+ logWarning(log"Request to alter database ${MDC(DATABASE_NAME,
dbDefinition.name)} is a " +
+ log"no-op because the provided database properties are the same as the
old ones. Hive " +
+ log"does not currently support altering other database fields.")
}
client.alterDatabase(dbDefinition)
}
@@ -380,8 +381,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
} catch {
case NonFatal(e) =>
val warningMessage =
- s"Could not persist ${table.identifier.quotedString} in a Hive "
+
- "compatible way. Persisting it into Hive metastore in Spark
SQL specific format."
+ log"Could not persist ${MDC(TABLE_NAME,
table.identifier.quotedString)} in a Hive " +
+ log"compatible way. Persisting it into Hive metastore in Spark
SQL specific format."
logWarning(warningMessage, e)
saveTableIntoHive(newSparkSQLSpecificMetastoreTable(),
ignoreIfExists)
}
@@ -676,9 +677,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
} catch {
case NonFatal(e) =>
- val warningMessage =
- s"Could not alter schema of table
${oldTable.identifier.quotedString} in a Hive " +
- "compatible way. Updating Hive metastore in Spark SQL specific
format."
+ val warningMessage = log"Could not alter schema of table " +
+ log"${MDC(TABLE_NAME, oldTable.identifier.quotedString)} in a Hive
compatible way. " +
+ log"Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA,
schemaProps)
}
@@ -800,10 +801,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
// schema we read back is different(ignore case and nullability) from
the one in table
// properties which was written when creating table, we should respect
the table schema
// from hive.
- logWarning(s"The table schema given by Hive
metastore(${table.schema.catalogString}) is " +
- "different from the schema when this table was created by Spark SQL"
+
- s"(${schemaFromTableProps.catalogString}). We have to fall back to
the table schema " +
- "from Hive metastore which is not case preserving.")
+ logWarning(log"The table schema given by Hive metastore" +
+ log"(${MDC(SCHEMA, table.schema.catalogString)}) is different from
the schema when " +
+ log"this table was created by Spark SQL" +
+ log"(${MDC(SCHEMA2, schemaFromTableProps.catalogString)}). We have
to fall back to " +
+ log"the table schema from Hive metastore which is not case
preserving.")
hiveTable.copy(schemaPreservesCase = false)
}
} else {
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f1d99d359cde..5b3160c56304 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,7 +25,8 @@ import com.google.common.util.concurrent.Striped
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{FILE_FORMAT, FILE_FORMAT2,
INFERENCE_MODE, TABLE_NAME}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
@@ -104,21 +105,28 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
None
}
case _ =>
- logWarning(s"Table $tableIdentifier should be stored as
$expectedFileFormat. " +
- s"However, we are getting a ${relation.fileFormat} from the
metastore cache. " +
- "This cached entry will be invalidated.")
+ logWarningUnexpectedFileFormat(tableIdentifier, expectedFileFormat,
+ relation.fileFormat.toString)
catalogProxy.invalidateCachedTable(tableIdentifier)
None
}
case other =>
- logWarning(s"Table $tableIdentifier should be stored as
$expectedFileFormat. " +
- s"However, we are getting a $other from the metastore cache. " +
- "This cached entry will be invalidated.")
+ logWarningUnexpectedFileFormat(tableIdentifier, expectedFileFormat,
other.toString)
catalogProxy.invalidateCachedTable(tableIdentifier)
None
}
}
+ private def logWarningUnexpectedFileFormat(
+ tableIdentifier: QualifiedTableName,
+ expectedFileFormat: Class[_ <: FileFormat],
+ actualFileFormat: String): Unit = {
+ logWarning(log"Table ${MDC(TABLE_NAME, tableIdentifier)} should be stored
as " +
+ log"${MDC(FILE_FORMAT, expectedFileFormat)}. However, we are getting a "
+
+ log"${MDC(FILE_FORMAT2, actualFileFormat)} from the metastore cache. " +
+ log"This cached entry will be invalidated.")
+ }
+
// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like
`hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
@@ -353,8 +361,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
val newSchema = StructType(dataSchema ++
relation.tableMeta.partitionSchema)
relation.tableMeta.copy(schema = newSchema)
case None =>
- logWarning(s"Unable to infer schema for table $tableName from file
format " +
- s"$fileFormat (inference mode: $inferenceMode). Using metastore
schema.")
+ logWarning(log"Unable to infer schema for table ${MDC(TABLE_NAME,
tableName)} from " +
+ log"file format ${MDC(FILE_FORMAT, fileFormat)} (inference mode: "
+
+ log"${MDC(INFERENCE_MODE, inferenceMode)}). Using metastore
schema.")
relation.tableMeta
}
} else {
@@ -367,7 +376,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
sparkSession.sessionState.catalog.alterTableDataSchema(identifier,
newDataSchema)
} catch {
case NonFatal(ex) =>
- logWarning(s"Unable to save case-sensitive schema for table
${identifier.unquotedString}", ex)
+ logWarning(log"Unable to save case-sensitive schema for table " +
+ log"${MDC(TABLE_NAME, identifier.unquotedString)}", ex)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 79d0af0f9a09..68f34bd2beb0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -36,7 +36,8 @@ import org.apache.hive.common.util.HiveVersionInfo
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.DDLUtils
@@ -320,7 +321,7 @@ private[spark] object HiveUtils extends Logging {
if (file.getName == "*") {
val files = file.getParentFile.listFiles()
if (files == null) {
- logWarning(s"Hive jar path '${file.getPath}' does not exist.")
+ logWarning(log"Hive jar path '${MDC(PATH, file.getPath)}' does not
exist.")
Nil
} else {
files.filter(_.getName.toLowerCase(Locale.ROOT).endsWith(".jar")).map(_.toURI.toURL)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 92561bed1195..502cec3be9c8 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -230,8 +230,8 @@ private[hive] class HiveClientImpl(
case e: Exception if causedByThrift(e) =>
caughtException = e
logWarning(
- "HiveClient got thrift exception, destroying client and retrying "
+
- s"(${retryLimit - numTries} tries remaining)", e)
+ log"HiveClient got thrift exception, destroying client and
retrying " +
+ log"${MDC(RETRY_COUNT, numTries)} times", e)
clientLoader.cachedHive = null
Thread.sleep(retryDelayMillis)
}
@@ -1335,8 +1335,8 @@ private[hive] object HiveClientImpl extends Logging {
// initialize spark or tez stuff, which is useless for spark.
val engine = hiveConf.get("hive.execution.engine")
if (engine != "mr") {
- logWarning(s"Detected HiveConf hive.execution.engine is '$engine' and
will be reset to 'mr'" +
- " to disable useless hive logic")
+ logWarning(log"Detected HiveConf hive.execution.engine is '${MDC(ENGINE,
engine)}' and " +
+ log"will be reset to 'mr' to disable useless hive logic")
hiveConf.set("hive.execution.engine", "mr", SOURCE_SPARK)
}
hiveConf
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 9943c0178fcf..07daa2938628 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -37,7 +37,8 @@ import
org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorF
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{CONFIG, CONFIG2, CONFIG3}
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
@@ -368,16 +369,16 @@ private[client] class Shim_v2_0 extends Shim with Logging
{
hive.getPartitionsByFilter(table, filter)
} catch {
case ex: MetaException if shouldFallback =>
- logWarning("Caught Hive MetaException attempting to get partition
metadata by " +
- "filter from Hive. Falling back to fetching all partition
metadata, which will " +
- "degrade performance. Modifying your Hive metastore
configuration to set " +
- s"${tryDirectSqlConfVar.varname} to true (if it is not true
already) may resolve " +
- "this problem. Or you can enable " +
- s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key}
" +
- "to alleviate performance downgrade. " +
- "Otherwise, to avoid degraded performance you can set " +
-
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
- " to false and let the query fail instead.", ex)
+ logWarning(log"Caught Hive MetaException attempting to get
partition metadata by " +
+ log"filter from Hive. Falling back to fetching all partition
metadata, which will " +
+ log"degrade performance. Modifying your Hive metastore
configuration to set " +
+ log"${MDC(CONFIG, tryDirectSqlConfVar.varname)} to true " +
+ log"(if it is not true already) may resolve this problem. Or you
can enable " +
+ log"${MDC(CONFIG2,
SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key)} " +
+ log"to alleviate performance downgrade. Otherwise, to avoid
degraded performance " +
+ log"you can set ${MDC(CONFIG3,
+
SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key)} " +
+ log"to false and let the query fail instead.", ex)
// HiveShim clients are expected to handle a superset of the
requested partitions
prunePartitionsFastFallback(hive, table, catalogTable, predicates)
case ex: MetaException =>
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 5297910cbfa4..e4bab4631ab1 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -30,7 +30,8 @@ import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmit
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.{FALLBACK_VERSION, HADOOP_VERSION}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.HiveUtils
@@ -66,10 +67,12 @@ private[hive] object IsolatedClientLoader extends Logging {
// If the error message contains hadoop, it is probably because
the hadoop
// version cannot be resolved.
val fallbackVersion = "3.4.0"
- logWarning(s"Failed to resolve Hadoop artifacts for the version
$hadoopVersion. We " +
- s"will change the hadoop version from $hadoopVersion to
$fallbackVersion and try " +
- "again. It is recommended to set jars used by Hive metastore
client through " +
- "spark.sql.hive.metastore.jars in the production environment.")
+ logWarning(log"Failed to resolve Hadoop artifacts for the version
" +
+ log"${MDC(HADOOP_VERSION, hadoopVersion)}. We will change the
hadoop version from " +
+ log"${MDC(HADOOP_VERSION, hadoopVersion)} to " +
+ log"${MDC(FALLBACK_VERSION, fallbackVersion)} and try again. It
is recommended to " +
+ log"set jars used by Hive metastore client through
spark.sql.hive.metastore.jars " +
+ log"in the production environment.")
(downloadVersion(
resolvedVersion, fallbackVersion, ivyPath, remoteRepos),
fallbackVersion)
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index fa21be0c6514..1b76478a5cf3 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -31,7 +31,8 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{JobConf, Reporter}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_NAME
import org.apache.spark.internal.config.SPECULATION_ENABLED
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -76,10 +77,10 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
val outputCommitterClass = conf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
- s"$outputCommitterClass may be an output committer that writes data
directly to " +
- "the final location. Because speculation is enabled, this output
committer may " +
- "cause data loss (see the case in SPARK-10063). If possible, please
use an output " +
- "committer that does not have this behavior (e.g.
FileOutputCommitter)."
+ log"${MDC(CLASS_NAME, outputCommitterClass)} may be an output
committer that writes data " +
+ log"directly to the final location. Because speculation is enabled,
this output " +
+ log"committer may cause data loss (see the case in SPARK-10063). If
possible, please " +
+ log"use an output committer that does not have this behavior (e.g.
FileOutputCommitter)."
logWarning(warningMessage)
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
index eb8482da38e5..a16191b72a8d 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
@@ -31,7 +31,8 @@ import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -140,7 +141,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf:
Configuration, path: P
} catch {
case NonFatal(e) =>
val stagingDir = hadoopConf.get("hive.exec.stagingdir",
".hive-staging")
- logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+ logWarning(log"Unable to delete staging directory: ${MDC(PATH,
stagingDir)}.", e)
}
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 3fdd5a9c4cec..3e1bdff8c007 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -25,7 +25,8 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.PATH
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.StructType
@@ -77,7 +78,7 @@ private[hive] object OrcFileOperator extends Logging {
} catch {
case e: IOException =>
if (ignoreCorruptFiles) {
- logWarning(s"Skipped the footer in the corrupted file: $path", e)
+ logWarning(log"Skipped the footer in the corrupted file:
${MDC(PATH, path)}", e)
None
} else {
throw QueryExecutionErrors.cannotReadFooterForFileError(path, e)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala
index 253c0a9ebafe..13ff721736b2 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala
@@ -32,7 +32,8 @@ import org.apache.hadoop.security.token.Token
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.CLASS_NAME
import org.apache.spark.internal.config.KEYTAB
import org.apache.spark.security.HadoopDelegationTokenProvider
import org.apache.spark.sql.hive.client.HiveClientImpl
@@ -43,8 +44,9 @@ private[spark] class HiveDelegationTokenProvider
override def serviceName: String = "hive"
- private val classNotFoundErrorStr = s"You are attempting to use the " +
- s"${getClass.getCanonicalName}, but your Spark distribution is not built
with Hive libraries."
+ private val classNotFoundErrorStr =
+ log"You are attempting to use the ${MDC(CLASS_NAME,
getClass.getCanonicalName)}, " +
+ log"but your Spark distribution is not built with Hive libraries."
private def hiveConf(hadoopConf: Configuration): Configuration = {
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]