This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a483dfd86018 [SPARK-50650][SQL] Improve logging in single-pass Analyzer
a483dfd86018 is described below

commit a483dfd8601868ceec74d2763c04f0ed82abde76
Author: Vladimir Golubev <[email protected]>
AuthorDate: Thu Dec 26 10:50:37 2024 +0800

    [SPARK-50650][SQL] Improve logging in single-pass Analyzer
    
    ### What changes were proposed in this pull request?
    
    1. Log initial unresolved plans. This was we see the full plan, and track 
the downwards traversal.
    2. Log expression tree changes in the same manner as operator tree changes.
    
    ### Why are the changes needed?
    
    To make single-pass Analyzer debugging easier.
    
    Examples:
    
![image](https://github.com/user-attachments/assets/507c3daf-0fe2-48a9-a66c-73e038550511)
    
    
![image](https://github.com/user-attachments/assets/8d8c9fbc-7371-4d29-a8c5-71aba57a65e5)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49271 from 
vladimirg-db/vladimirg-db/single-pass-analyzer/improve-plan-logger.
    
    Authored-by: Vladimir Golubev <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../analysis/resolver/ExpressionResolver.scala     | 17 +++--
 .../catalyst/analysis/resolver/PlanLogger.scala    | 88 ++++++++++++++++------
 .../sql/catalyst/analysis/resolver/Resolver.scala  | 18 ++++-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 15 ++++
 .../resolver/TracksResolvedNodesSuite.scala        |  4 +-
 5 files changed, 108 insertions(+), 34 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala
index 0a9e2b9c5a87..1d072509626b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala
@@ -64,11 +64,13 @@ import org.apache.spark.sql.types.MetadataBuilder
  *   operators which are nested in expressions.
  * @param scopes [[NameScopeStack]] to resolve the expression tree in the 
correct scope.
  * @param functionResolution [[FunctionResolution]] to resolve function 
expressions.
+ * @param planLogger [[PlanLogger]] to log expression tree resolution events.
  */
 class ExpressionResolver(
     resolver: Resolver,
     scopes: NameScopeStack,
-    functionResolution: FunctionResolution)
+    functionResolution: FunctionResolution,
+    planLogger: PlanLogger)
     extends TreeNodeResolver[Expression, Expression]
     with ProducesUnresolvedSubtree
     with ResolvesExpressionChildren
@@ -118,7 +120,9 @@ class ExpressionResolver(
    * In this case `IN` is an expression and `SELECT 1` is a nested operator 
tree for which
    * the [[ExpressionResolver]] would invoke the [[Resolver]].
    */
-  override def resolve(unresolvedExpression: Expression): Expression =
+  override def resolve(unresolvedExpression: Expression): Expression = {
+    planLogger.logExpressionTreeResolutionEvent(unresolvedExpression, 
"Unresolved expression tree")
+
     if (unresolvedExpression
         .getTagValue(ExpressionResolver.SINGLE_PASS_SUBTREE_BOUNDARY)
         .nonEmpty) {
@@ -126,7 +130,7 @@ class ExpressionResolver(
     } else {
       throwIfNodeWasResolvedEarlier(unresolvedExpression)
 
-      val resolvedExpr = unresolvedExpression match {
+      val resolvedExpression = unresolvedExpression match {
         case unresolvedBinaryArithmetic: BinaryArithmetic =>
           binaryArithmeticResolver.resolve(unresolvedBinaryArithmetic)
         case unresolvedExtractANSIIntervalDays: ExtractANSIIntervalDays =>
@@ -157,10 +161,13 @@ class ExpressionResolver(
           }
       }
 
-      markNodeAsResolved(resolvedExpr)
+      markNodeAsResolved(resolvedExpression)
+
+      planLogger.logExpressionTreeResolution(unresolvedExpression, 
resolvedExpression)
 
-      resolvedExpr
+      resolvedExpression
     }
+  }
 
   private def resolveNamedExpression(
       unresolvedNamedExpression: Expression,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala
index fcf1eab0c04a..8870befede4f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.catalyst.analysis.resolver
 
 import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
-import org.apache.spark.internal.LogKeys.QUERY_PLAN
+import org.apache.spark.internal.LogKeys.{MESSAGE, QUERY_PLAN}
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.internal.SQLConf
@@ -27,32 +28,71 @@ import org.apache.spark.sql.internal.SQLConf
  * [[PlanLogger]] is used by the [[Resolver]] to log intermediate resolution 
results.
  */
 class PlanLogger extends Logging {
-  private val logLevel = SQLConf.get.planChangeLogLevel
+  private val planChangeLogLevel = SQLConf.get.planChangeLogLevel
+  private val expressionTreeChangeLogLevel = 
SQLConf.get.expressionTreeChangeLogLevel
 
-  /**
-   * Logs the transition from the `unresolvedPlan` to the `resolvedPlan`.
-   */
-  def log(unresolvedPlan: LogicalPlan, resolvedPlan: LogicalPlan): Unit = {
-    logBasedOnLevel(() => createMessage(unresolvedPlan, resolvedPlan))
+  def logPlanResolutionEvent(plan: LogicalPlan, event: String): Unit = {
+    log(() => log"""
+       |=== Plan resolution: ${MDC(MESSAGE, event)} ===
+       |${MDC(QUERY_PLAN, plan.treeString)}
+     """.stripMargin, planChangeLogLevel)
   }
 
-  private def createMessage(
-      unresolvedPlan: LogicalPlan,
-      resolvedPlan: LogicalPlan): MessageWithContext =
-    log"""
-       |=== Unresolved/resolved operator subtree ===
+  def logPlanResolution(unresolvedPlan: LogicalPlan, resolvedPlan: 
LogicalPlan): Unit = {
+    log(
+      () =>
+        log"""
+       |=== Unresolved plan -> Resolved plan ===
        |${MDC(
-           QUERY_PLAN,
-           sideBySide(unresolvedPlan.treeString, 
resolvedPlan.treeString).mkString("\n")
-         )}
-     """.stripMargin
-
-  private def logBasedOnLevel(createMessage: () => MessageWithContext): Unit = 
logLevel match {
-    case "TRACE" => logTrace(createMessage().message)
-    case "DEBUG" => logDebug(createMessage().message)
-    case "INFO" => logInfo(createMessage())
-    case "WARN" => logWarning(createMessage())
-    case "ERROR" => logError(createMessage())
-    case _ => logTrace(createMessage().message)
+               QUERY_PLAN,
+               sideBySide(
+                 
unresolvedPlan.withNewChildren(resolvedPlan.children).treeString,
+                 resolvedPlan.treeString
+               ).mkString("\n")
+             )}
+     """.stripMargin,
+      planChangeLogLevel
+    )
   }
+
+  def logExpressionTreeResolutionEvent(expressionTree: Expression, event: 
String): Unit = {
+    log(
+      () => log"""
+       |=== Expression tree resolution: ${MDC(MESSAGE, event)} ===
+       |${MDC(QUERY_PLAN, expressionTree.treeString)}
+     """.stripMargin,
+      expressionTreeChangeLogLevel
+    )
+  }
+
+  def logExpressionTreeResolution(
+      unresolvedExpressionTree: Expression,
+      resolvedExpressionTree: Expression): Unit = {
+    log(
+      () =>
+        log"""
+       |=== Unresolved expression tree -> Resolved expression tree ===
+       |${MDC(
+               QUERY_PLAN,
+               sideBySide(
+                 unresolvedExpressionTree
+                   .withNewChildren(resolvedExpressionTree.children)
+                   .treeString,
+                 resolvedExpressionTree.treeString
+               ).mkString("\n")
+             )}
+     """.stripMargin,
+      expressionTreeChangeLogLevel
+    )
+  }
+
+  private def log(createMessage: () => MessageWithContext, logLevel: String): 
Unit =
+    logLevel match {
+      case "TRACE" => logTrace(createMessage().message)
+      case "DEBUG" => logDebug(createMessage().message)
+      case "INFO" => logInfo(createMessage())
+      case "WARN" => logWarning(createMessage())
+      case "ERROR" => logError(createMessage())
+      case _ => logTrace(createMessage().message)
+    }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
index b0e6828a97a0..37b875abaade 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
@@ -70,11 +70,12 @@ class Resolver(
     with TracksResolvedNodes[LogicalPlan]
     with DelegatesResolutionToExtensions {
   private val scopes = new NameScopeStack
+  private val planLogger = new PlanLogger
   private val relationResolution = 
Resolver.createRelationResolution(catalogManager)
   private val functionResolution = new FunctionResolution(catalogManager, 
relationResolution)
-  private val expressionResolver = new ExpressionResolver(this, scopes, 
functionResolution)
+  private val expressionResolver =
+    new ExpressionResolver(this, scopes, functionResolution, planLogger)
   private val limitExpressionResolver = new 
LimitExpressionResolver(expressionResolver)
-  private val planLogger = new PlanLogger
 
   /**
    * [[relationMetadataProvider]] is used to resolve metadata for relations. 
It's initialized with
@@ -101,6 +102,8 @@ class Resolver(
   def lookupMetadataAndResolve(
       unresolvedPlan: LogicalPlan,
       analyzerBridgeState: Option[AnalyzerBridgeState] = None): LogicalPlan = {
+    planLogger.logPlanResolutionEvent(unresolvedPlan, "Lookup metadata and 
resolve")
+
     relationMetadataProvider = analyzerBridgeState match {
       case Some(analyzerBridgeState) =>
         new BridgedRelationMetadataProvider(
@@ -134,6 +137,8 @@ class Resolver(
    * producing a fully resolved plan or a descriptive error message.
    */
   override def resolve(unresolvedPlan: LogicalPlan): LogicalPlan = {
+    planLogger.logPlanResolutionEvent(unresolvedPlan, "Unresolved plan")
+
     throwIfNodeWasResolvedEarlier(unresolvedPlan)
 
     val resolvedPlan =
@@ -167,7 +172,9 @@ class Resolver(
       }
 
     markNodeAsResolved(resolvedPlan)
-    planLogger.log(unresolvedPlan, resolvedPlan)
+
+    planLogger.logPlanResolution(unresolvedPlan, resolvedPlan)
+
     resolvedPlan
   }
 
@@ -260,7 +267,10 @@ class Resolver(
   private def resolveRelation(unresolvedRelation: UnresolvedRelation): 
LogicalPlan = {
     
relationMetadataProvider.getRelationWithResolvedMetadata(unresolvedRelation) 
match {
       case Some(relationWithResolvedMetadata) =>
-        planLogger.log(unresolvedRelation, relationWithResolvedMetadata)
+        planLogger.logPlanResolutionEvent(
+          relationWithResolvedMetadata,
+          "Relation metadata retrieved"
+        )
 
         withPosition(unresolvedRelation) {
           resolve(relationWithResolvedMetadata)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d5f18231a6c1..08f77d58979f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -400,6 +400,19 @@ object SQLConf {
     .booleanConf
     .createWithDefault(Utils.isTesting)
 
+  val EXPRESSION_TREE_CHANGE_LOG_LEVEL = 
buildConf("spark.sql.expressionTreeChangeLog.level")
+    .internal()
+    .doc("Configures the log level for logging the change from the unresolved 
expression tree to " +
+      "the resolved expression tree in the single-pass bottom-up Resolver. The 
value can be " +
+      "'trace', 'debug', 'info', 'warn', or 'error'. The default log level is 
'trace'.")
+    .version("4.0.0")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", 
"ERROR").contains(logLevel),
+      "Invalid value for 'spark.sql.expressionTreeChangeLog.level'. Valid 
values are " +
+        "'trace', 'debug', 'info', 'warn' and 'error'.")
+    .createWithDefault("trace")
+
   val LIGHTWEIGHT_PLAN_CHANGE_VALIDATION = 
buildConf("spark.sql.lightweightPlanChangeValidation")
     .internal()
     .doc(s"Similar to ${PLAN_CHANGE_VALIDATION.key}, this validates plan 
changes and runs after " +
@@ -5578,6 +5591,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def planChangeBatches: Option[String] = getConf(PLAN_CHANGE_LOG_BATCHES)
 
+  def expressionTreeChangeLogLevel: String = 
getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL)
+
   def dynamicPartitionPruningEnabled: Boolean = 
getConf(DYNAMIC_PARTITION_PRUNING_ENABLED)
 
   def dynamicPartitionPruningUseStats: Boolean = 
getConf(DYNAMIC_PARTITION_PRUNING_USE_STATS)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala
index 28ccebc89bc5..b7bf73f326fa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala
@@ -23,6 +23,7 @@ import 
org.apache.spark.sql.catalyst.analysis.FunctionResolution
 import org.apache.spark.sql.catalyst.analysis.resolver.{
   ExpressionResolver,
   NameScopeStack,
+  PlanLogger,
   Resolver
 }
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
ExprId}
@@ -127,7 +128,8 @@ class TracksResolvedNodesSuite extends QueryTest with 
SharedSparkSession {
       new FunctionResolution(
         spark.sessionState.catalogManager,
         Resolver.createRelationResolution(spark.sessionState.catalogManager)
-      )
+      ),
+      new PlanLogger
     )
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to