This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a483dfd86018 [SPARK-50650][SQL] Improve logging in single-pass Analyzer
a483dfd86018 is described below
commit a483dfd8601868ceec74d2763c04f0ed82abde76
Author: Vladimir Golubev <[email protected]>
AuthorDate: Thu Dec 26 10:50:37 2024 +0800
[SPARK-50650][SQL] Improve logging in single-pass Analyzer
### What changes were proposed in this pull request?
1. Log initial unresolved plans. This was we see the full plan, and track
the downwards traversal.
2. Log expression tree changes in the same manner as operator tree changes.
### Why are the changes needed?
To make single-pass Analyzer debugging easier.
Examples:


### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49271 from
vladimirg-db/vladimirg-db/single-pass-analyzer/improve-plan-logger.
Authored-by: Vladimir Golubev <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../analysis/resolver/ExpressionResolver.scala | 17 +++--
.../catalyst/analysis/resolver/PlanLogger.scala | 88 ++++++++++++++++------
.../sql/catalyst/analysis/resolver/Resolver.scala | 18 ++++-
.../org/apache/spark/sql/internal/SQLConf.scala | 15 ++++
.../resolver/TracksResolvedNodesSuite.scala | 4 +-
5 files changed, 108 insertions(+), 34 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala
index 0a9e2b9c5a87..1d072509626b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ExpressionResolver.scala
@@ -64,11 +64,13 @@ import org.apache.spark.sql.types.MetadataBuilder
* operators which are nested in expressions.
* @param scopes [[NameScopeStack]] to resolve the expression tree in the
correct scope.
* @param functionResolution [[FunctionResolution]] to resolve function
expressions.
+ * @param planLogger [[PlanLogger]] to log expression tree resolution events.
*/
class ExpressionResolver(
resolver: Resolver,
scopes: NameScopeStack,
- functionResolution: FunctionResolution)
+ functionResolution: FunctionResolution,
+ planLogger: PlanLogger)
extends TreeNodeResolver[Expression, Expression]
with ProducesUnresolvedSubtree
with ResolvesExpressionChildren
@@ -118,7 +120,9 @@ class ExpressionResolver(
* In this case `IN` is an expression and `SELECT 1` is a nested operator
tree for which
* the [[ExpressionResolver]] would invoke the [[Resolver]].
*/
- override def resolve(unresolvedExpression: Expression): Expression =
+ override def resolve(unresolvedExpression: Expression): Expression = {
+ planLogger.logExpressionTreeResolutionEvent(unresolvedExpression,
"Unresolved expression tree")
+
if (unresolvedExpression
.getTagValue(ExpressionResolver.SINGLE_PASS_SUBTREE_BOUNDARY)
.nonEmpty) {
@@ -126,7 +130,7 @@ class ExpressionResolver(
} else {
throwIfNodeWasResolvedEarlier(unresolvedExpression)
- val resolvedExpr = unresolvedExpression match {
+ val resolvedExpression = unresolvedExpression match {
case unresolvedBinaryArithmetic: BinaryArithmetic =>
binaryArithmeticResolver.resolve(unresolvedBinaryArithmetic)
case unresolvedExtractANSIIntervalDays: ExtractANSIIntervalDays =>
@@ -157,10 +161,13 @@ class ExpressionResolver(
}
}
- markNodeAsResolved(resolvedExpr)
+ markNodeAsResolved(resolvedExpression)
+
+ planLogger.logExpressionTreeResolution(unresolvedExpression,
resolvedExpression)
- resolvedExpr
+ resolvedExpression
}
+ }
private def resolveNamedExpression(
unresolvedNamedExpression: Expression,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala
index fcf1eab0c04a..8870befede4f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/PlanLogger.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.analysis.resolver
import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
-import org.apache.spark.internal.LogKeys.QUERY_PLAN
+import org.apache.spark.internal.LogKeys.{MESSAGE, QUERY_PLAN}
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.internal.SQLConf
@@ -27,32 +28,71 @@ import org.apache.spark.sql.internal.SQLConf
* [[PlanLogger]] is used by the [[Resolver]] to log intermediate resolution
results.
*/
class PlanLogger extends Logging {
- private val logLevel = SQLConf.get.planChangeLogLevel
+ private val planChangeLogLevel = SQLConf.get.planChangeLogLevel
+ private val expressionTreeChangeLogLevel =
SQLConf.get.expressionTreeChangeLogLevel
- /**
- * Logs the transition from the `unresolvedPlan` to the `resolvedPlan`.
- */
- def log(unresolvedPlan: LogicalPlan, resolvedPlan: LogicalPlan): Unit = {
- logBasedOnLevel(() => createMessage(unresolvedPlan, resolvedPlan))
+ def logPlanResolutionEvent(plan: LogicalPlan, event: String): Unit = {
+ log(() => log"""
+ |=== Plan resolution: ${MDC(MESSAGE, event)} ===
+ |${MDC(QUERY_PLAN, plan.treeString)}
+ """.stripMargin, planChangeLogLevel)
}
- private def createMessage(
- unresolvedPlan: LogicalPlan,
- resolvedPlan: LogicalPlan): MessageWithContext =
- log"""
- |=== Unresolved/resolved operator subtree ===
+ def logPlanResolution(unresolvedPlan: LogicalPlan, resolvedPlan:
LogicalPlan): Unit = {
+ log(
+ () =>
+ log"""
+ |=== Unresolved plan -> Resolved plan ===
|${MDC(
- QUERY_PLAN,
- sideBySide(unresolvedPlan.treeString,
resolvedPlan.treeString).mkString("\n")
- )}
- """.stripMargin
-
- private def logBasedOnLevel(createMessage: () => MessageWithContext): Unit =
logLevel match {
- case "TRACE" => logTrace(createMessage().message)
- case "DEBUG" => logDebug(createMessage().message)
- case "INFO" => logInfo(createMessage())
- case "WARN" => logWarning(createMessage())
- case "ERROR" => logError(createMessage())
- case _ => logTrace(createMessage().message)
+ QUERY_PLAN,
+ sideBySide(
+
unresolvedPlan.withNewChildren(resolvedPlan.children).treeString,
+ resolvedPlan.treeString
+ ).mkString("\n")
+ )}
+ """.stripMargin,
+ planChangeLogLevel
+ )
}
+
+ def logExpressionTreeResolutionEvent(expressionTree: Expression, event:
String): Unit = {
+ log(
+ () => log"""
+ |=== Expression tree resolution: ${MDC(MESSAGE, event)} ===
+ |${MDC(QUERY_PLAN, expressionTree.treeString)}
+ """.stripMargin,
+ expressionTreeChangeLogLevel
+ )
+ }
+
+ def logExpressionTreeResolution(
+ unresolvedExpressionTree: Expression,
+ resolvedExpressionTree: Expression): Unit = {
+ log(
+ () =>
+ log"""
+ |=== Unresolved expression tree -> Resolved expression tree ===
+ |${MDC(
+ QUERY_PLAN,
+ sideBySide(
+ unresolvedExpressionTree
+ .withNewChildren(resolvedExpressionTree.children)
+ .treeString,
+ resolvedExpressionTree.treeString
+ ).mkString("\n")
+ )}
+ """.stripMargin,
+ expressionTreeChangeLogLevel
+ )
+ }
+
+ private def log(createMessage: () => MessageWithContext, logLevel: String):
Unit =
+ logLevel match {
+ case "TRACE" => logTrace(createMessage().message)
+ case "DEBUG" => logDebug(createMessage().message)
+ case "INFO" => logInfo(createMessage())
+ case "WARN" => logWarning(createMessage())
+ case "ERROR" => logError(createMessage())
+ case _ => logTrace(createMessage().message)
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
index b0e6828a97a0..37b875abaade 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
@@ -70,11 +70,12 @@ class Resolver(
with TracksResolvedNodes[LogicalPlan]
with DelegatesResolutionToExtensions {
private val scopes = new NameScopeStack
+ private val planLogger = new PlanLogger
private val relationResolution =
Resolver.createRelationResolution(catalogManager)
private val functionResolution = new FunctionResolution(catalogManager,
relationResolution)
- private val expressionResolver = new ExpressionResolver(this, scopes,
functionResolution)
+ private val expressionResolver =
+ new ExpressionResolver(this, scopes, functionResolution, planLogger)
private val limitExpressionResolver = new
LimitExpressionResolver(expressionResolver)
- private val planLogger = new PlanLogger
/**
* [[relationMetadataProvider]] is used to resolve metadata for relations.
It's initialized with
@@ -101,6 +102,8 @@ class Resolver(
def lookupMetadataAndResolve(
unresolvedPlan: LogicalPlan,
analyzerBridgeState: Option[AnalyzerBridgeState] = None): LogicalPlan = {
+ planLogger.logPlanResolutionEvent(unresolvedPlan, "Lookup metadata and
resolve")
+
relationMetadataProvider = analyzerBridgeState match {
case Some(analyzerBridgeState) =>
new BridgedRelationMetadataProvider(
@@ -134,6 +137,8 @@ class Resolver(
* producing a fully resolved plan or a descriptive error message.
*/
override def resolve(unresolvedPlan: LogicalPlan): LogicalPlan = {
+ planLogger.logPlanResolutionEvent(unresolvedPlan, "Unresolved plan")
+
throwIfNodeWasResolvedEarlier(unresolvedPlan)
val resolvedPlan =
@@ -167,7 +172,9 @@ class Resolver(
}
markNodeAsResolved(resolvedPlan)
- planLogger.log(unresolvedPlan, resolvedPlan)
+
+ planLogger.logPlanResolution(unresolvedPlan, resolvedPlan)
+
resolvedPlan
}
@@ -260,7 +267,10 @@ class Resolver(
private def resolveRelation(unresolvedRelation: UnresolvedRelation):
LogicalPlan = {
relationMetadataProvider.getRelationWithResolvedMetadata(unresolvedRelation)
match {
case Some(relationWithResolvedMetadata) =>
- planLogger.log(unresolvedRelation, relationWithResolvedMetadata)
+ planLogger.logPlanResolutionEvent(
+ relationWithResolvedMetadata,
+ "Relation metadata retrieved"
+ )
withPosition(unresolvedRelation) {
resolve(relationWithResolvedMetadata)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d5f18231a6c1..08f77d58979f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -400,6 +400,19 @@ object SQLConf {
.booleanConf
.createWithDefault(Utils.isTesting)
+ val EXPRESSION_TREE_CHANGE_LOG_LEVEL =
buildConf("spark.sql.expressionTreeChangeLog.level")
+ .internal()
+ .doc("Configures the log level for logging the change from the unresolved
expression tree to " +
+ "the resolved expression tree in the single-pass bottom-up Resolver. The
value can be " +
+ "'trace', 'debug', 'info', 'warn', or 'error'. The default log level is
'trace'.")
+ .version("4.0.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN",
"ERROR").contains(logLevel),
+ "Invalid value for 'spark.sql.expressionTreeChangeLog.level'. Valid
values are " +
+ "'trace', 'debug', 'info', 'warn' and 'error'.")
+ .createWithDefault("trace")
+
val LIGHTWEIGHT_PLAN_CHANGE_VALIDATION =
buildConf("spark.sql.lightweightPlanChangeValidation")
.internal()
.doc(s"Similar to ${PLAN_CHANGE_VALIDATION.key}, this validates plan
changes and runs after " +
@@ -5578,6 +5591,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def planChangeBatches: Option[String] = getConf(PLAN_CHANGE_LOG_BATCHES)
+ def expressionTreeChangeLogLevel: String =
getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL)
+
def dynamicPartitionPruningEnabled: Boolean =
getConf(DYNAMIC_PARTITION_PRUNING_ENABLED)
def dynamicPartitionPruningUseStats: Boolean =
getConf(DYNAMIC_PARTITION_PRUNING_USE_STATS)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala
index 28ccebc89bc5..b7bf73f326fa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/TracksResolvedNodesSuite.scala
@@ -23,6 +23,7 @@ import
org.apache.spark.sql.catalyst.analysis.FunctionResolution
import org.apache.spark.sql.catalyst.analysis.resolver.{
ExpressionResolver,
NameScopeStack,
+ PlanLogger,
Resolver
}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast,
ExprId}
@@ -127,7 +128,8 @@ class TracksResolvedNodesSuite extends QueryTest with
SharedSparkSession {
new FunctionResolution(
spark.sessionState.catalogManager,
Resolver.createRelationResolution(spark.sessionState.catalogManager)
- )
+ ),
+ new PlanLogger
)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]