This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ec509b49dcaa [SPARK-47586][SQL] Hive module: Migrate logError with
variables to structured logging framework
ec509b49dcaa is described below
commit ec509b49dcaa21d6dcdf18c1b40ac9d6df1827d7
Author: Haejoon Lee <[email protected]>
AuthorDate: Tue Apr 9 18:22:40 2024 -0700
[SPARK-47586][SQL] Hive module: Migrate logError with variables to
structured logging framework
### What changes were proposed in this pull request?
This PR proposes to migrate `logError` with variables of Hive module to
structured logging framework.
### Why are the changes needed?
To improve the existing logging system by migrating into structured logging.
### Does this PR introduce _any_ user-facing change?
No API changes, but the SQL catalyst logs will contain MDC(Mapped
Diagnostic Context) from now.
### How was this patch tested?
Run Scala auto formatting and style check. Also the existing CI should pass.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45876 from itholic/hive-logerror.
Lead-authored-by: Haejoon Lee <[email protected]>
Co-authored-by: Haejoon Lee <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../main/scala/org/apache/spark/internal/LogKey.scala | 6 ++++++
.../scala/org/apache/spark/sql/hive/TableReader.scala | 8 ++++++--
.../apache/spark/sql/hive/client/HiveClientImpl.scala | 17 ++++++++++-------
3 files changed, 22 insertions(+), 9 deletions(-)
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 7fa0331515cb..2cb5eac4548c 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -51,7 +51,9 @@ object LogKey extends Enumeration {
val CSV_SCHEMA_FIELD_NAME = Value
val CSV_SCHEMA_FIELD_NAMES = Value
val CSV_SOURCE = Value
+ val DATABASE_NAME = Value
val DRIVER_ID = Value
+ val DROPPED_PARTITIONS = Value
val END_POINT = Value
val ERROR = Value
val EVENT_LOOP = Value
@@ -61,6 +63,7 @@ object LogKey extends Enumeration {
val EXIT_CODE = Value
val EXPRESSION_TERMS = Value
val FAILURES = Value
+ val FIELD_NAME = Value
val FUNCTION_NAME = Value
val FUNCTION_PARAMETER = Value
val GROUP_ID = Value
@@ -92,6 +95,7 @@ object LogKey extends Enumeration {
val PARSE_MODE = Value
val PARTITION_ID = Value
val PARTITION_SPECIFICATION = Value
+ val PARTITION_SPECS = Value
val PATH = Value
val PATHS = Value
val POD_ID = Value
@@ -105,6 +109,7 @@ object LogKey extends Enumeration {
val REASON = Value
val REDUCE_ID = Value
val RELATION_NAME = Value
+ val REMAINING_PARTITIONS = Value
val REMOTE_ADDRESS = Value
val RETRY_COUNT = Value
val RETRY_INTERVAL = Value
@@ -124,6 +129,7 @@ object LogKey extends Enumeration {
val STATEMENT_ID = Value
val SUBMISSION_ID = Value
val SUBSAMPLING_RATE = Value
+ val TABLE_NAME = Value
val TASK_ATTEMPT_ID = Value
val TASK_ID = Value
val TASK_NAME = Value
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index d72406f094a6..60970eecc2df 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -36,7 +36,8 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat
=> oldInputClass,
import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
@@ -518,7 +519,10 @@ private[hive] object HadoopTableReader extends
HiveInspectors with Logging {
i += 1
} catch {
case ex: Throwable =>
- logError(s"Exception thrown in field
<${fieldRefs(i).getFieldName}>")
+ logError(
+ log"Exception thrown in field <${MDC(FIELD_NAME,
fieldRefs(i).getFieldName)}>",
+ ex
+ )
throw ex
}
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 46dc56372334..92561bed1195 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -46,7 +46,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException,
NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException,
NoSuchTableException, PartitionsAlreadyExistException}
@@ -686,17 +687,19 @@ private[hive] class HiveClientImpl(
} catch {
case e: Exception =>
val remainingParts = matchingParts.toBuffer --= droppedParts
+ // scalastyle:off line.size.limit
logError(
- s"""
+ log"""
|======================
- |Attempt to drop the partition specs in table '$table' database
'$db':
- |${specs.mkString("\n")}
+ |Attempt to drop the partition specs in table
'${MDC(TABLE_NAME, table)}' database '${MDC(DATABASE_NAME, db)}':
+ |${MDC(PARTITION_SPECS, specs.mkString("\n"))}
|In this attempt, the following partitions have been dropped
successfully:
- |${droppedParts.mkString("\n")}
+ |${MDC(DROPPED_PARTITIONS, droppedParts.mkString("\n"))}
|The remaining partitions have not been dropped:
- |${remainingParts.mkString("\n")}
+ |${MDC(REMAINING_PARTITIONS, remainingParts.mkString("\n"))}
|======================
""".stripMargin)
+ // scalastyle:on line.size.limit
throw e
}
droppedParts += partition
@@ -910,7 +913,7 @@ private[hive] class HiveClientImpl(
|======================
|END HIVE FAILURE OUTPUT
|======================
- """.stripMargin)
+ """.stripMargin, e)
throw e
} finally {
if (state != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]