This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ec509b49dcaa [SPARK-47586][SQL] Hive module: Migrate logError with 
variables to structured logging framework
ec509b49dcaa is described below

commit ec509b49dcaa21d6dcdf18c1b40ac9d6df1827d7
Author: Haejoon Lee <haejoon....@databricks.com>
AuthorDate: Tue Apr 9 18:22:40 2024 -0700

    [SPARK-47586][SQL] Hive module: Migrate logError with variables to 
structured logging framework
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to migrate `logError` with variables of Hive module to 
structured logging framework.
    
    ### Why are the changes needed?
    
    To improve the existing logging system by migrating into structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No API changes, but the SQL catalyst logs will contain MDC(Mapped 
Diagnostic Context) from now.
    
    ### How was this patch tested?
    
    Run Scala auto formatting and style check. Also the existing CI should pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45876 from itholic/hive-logerror.
    
    Lead-authored-by: Haejoon Lee <haejoon....@databricks.com>
    Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../main/scala/org/apache/spark/internal/LogKey.scala   |  6 ++++++
 .../scala/org/apache/spark/sql/hive/TableReader.scala   |  8 ++++++--
 .../apache/spark/sql/hive/client/HiveClientImpl.scala   | 17 ++++++++++-------
 3 files changed, 22 insertions(+), 9 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 7fa0331515cb..2cb5eac4548c 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -51,7 +51,9 @@ object LogKey extends Enumeration {
   val CSV_SCHEMA_FIELD_NAME = Value
   val CSV_SCHEMA_FIELD_NAMES = Value
   val CSV_SOURCE = Value
+  val DATABASE_NAME = Value
   val DRIVER_ID = Value
+  val DROPPED_PARTITIONS = Value
   val END_POINT = Value
   val ERROR = Value
   val EVENT_LOOP = Value
@@ -61,6 +63,7 @@ object LogKey extends Enumeration {
   val EXIT_CODE = Value
   val EXPRESSION_TERMS = Value
   val FAILURES = Value
+  val FIELD_NAME = Value
   val FUNCTION_NAME = Value
   val FUNCTION_PARAMETER = Value
   val GROUP_ID = Value
@@ -92,6 +95,7 @@ object LogKey extends Enumeration {
   val PARSE_MODE = Value
   val PARTITION_ID = Value
   val PARTITION_SPECIFICATION = Value
+  val PARTITION_SPECS = Value
   val PATH = Value
   val PATHS = Value
   val POD_ID = Value
@@ -105,6 +109,7 @@ object LogKey extends Enumeration {
   val REASON = Value
   val REDUCE_ID = Value
   val RELATION_NAME = Value
+  val REMAINING_PARTITIONS = Value
   val REMOTE_ADDRESS = Value
   val RETRY_COUNT = Value
   val RETRY_INTERVAL = Value
@@ -124,6 +129,7 @@ object LogKey extends Enumeration {
   val STATEMENT_ID = Value
   val SUBMISSION_ID = Value
   val SUBSAMPLING_RATE = Value
+  val TABLE_NAME = Value
   val TASK_ATTEMPT_ID = Value
   val TASK_ID = Value
   val TASK_NAME = Value
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index d72406f094a6..60970eecc2df 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -36,7 +36,8 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat 
=> oldInputClass,
 import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
 
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
@@ -518,7 +519,10 @@ private[hive] object HadoopTableReader extends 
HiveInspectors with Logging {
           i += 1
         } catch {
           case ex: Throwable =>
-            logError(s"Exception thrown in field 
<${fieldRefs(i).getFieldName}>")
+            logError(
+              log"Exception thrown in field <${MDC(FIELD_NAME, 
fieldRefs(i).getFieldName)}>",
+              ex
+            )
             throw ex
         }
       }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 46dc56372334..92561bed1195 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -46,7 +46,8 @@ import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey._
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, 
NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, 
NoSuchTableException, PartitionsAlreadyExistException}
@@ -686,17 +687,19 @@ private[hive] class HiveClientImpl(
       } catch {
         case e: Exception =>
           val remainingParts = matchingParts.toBuffer --= droppedParts
+          // scalastyle:off line.size.limit
           logError(
-            s"""
+            log"""
                |======================
-               |Attempt to drop the partition specs in table '$table' database 
'$db':
-               |${specs.mkString("\n")}
+               |Attempt to drop the partition specs in table 
'${MDC(TABLE_NAME, table)}' database '${MDC(DATABASE_NAME, db)}':
+               |${MDC(PARTITION_SPECS, specs.mkString("\n"))}
                |In this attempt, the following partitions have been dropped 
successfully:
-               |${droppedParts.mkString("\n")}
+               |${MDC(DROPPED_PARTITIONS, droppedParts.mkString("\n"))}
                |The remaining partitions have not been dropped:
-               |${remainingParts.mkString("\n")}
+               |${MDC(REMAINING_PARTITIONS, remainingParts.mkString("\n"))}
                |======================
              """.stripMargin)
+          // scalastyle:on line.size.limit
           throw e
       }
       droppedParts += partition
@@ -910,7 +913,7 @@ private[hive] class HiveClientImpl(
             |======================
             |END HIVE FAILURE OUTPUT
             |======================
-          """.stripMargin)
+          """.stripMargin, e)
         throw e
     } finally {
       if (state != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to