This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7a7ea6036eec [SPARK-54564][SQL] Make QueryPlanningTracker as
HybridAnalyzer field
7a7ea6036eec is described below
commit 7a7ea6036eecd39b4656e3416a1c452eac0fe145
Author: mihailoale-db <[email protected]>
AuthorDate: Wed Dec 3 08:09:19 2025 -0800
[SPARK-54564][SQL] Make QueryPlanningTracker as HybridAnalyzer field
### What changes were proposed in this pull request?
In this PR I propose to make `QueryPlanningTracker` as `HybridAnalyzer`
field.
### Why are the changes needed?
In order to simplify the code and further single-pass analyzer development.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53277 from mihailoale-db/analyzertracker.
Authored-by: mihailoale-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +-
.../analysis/resolver/HybridAnalyzer.scala | 7 ++-
.../analysis/resolver/HybridAnalyzerSuite.scala | 71 +++++++++++++---------
3 files changed, 50 insertions(+), 32 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 08c31939f161..67d25296a1e2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -319,7 +319,7 @@ class Analyzer(
AnalysisContext.reset()
try {
AnalysisHelper.markInAnalyzer {
- HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer =
this).apply(plan, tracker)
+ HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this, tracker =
tracker).apply(plan)
}
} finally {
AnalysisContext.reset()
@@ -327,7 +327,7 @@ class Analyzer(
} else {
AnalysisContext.withNewAnalysisContext {
AnalysisHelper.markInAnalyzer {
- HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer =
this).apply(plan, tracker)
+ HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this, tracker =
tracker).apply(plan)
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala
index d346969be8ef..ecdbef86a297 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala
@@ -61,6 +61,7 @@ class HybridAnalyzer(
legacyAnalyzer: Analyzer,
resolverGuard: ResolverGuard,
resolver: Resolver,
+ tracker: QueryPlanningTracker,
extendedResolutionChecks: Seq[LogicalPlan => Unit] = Seq.empty,
extendedRewriteRules: Seq[Rule[LogicalPlan]] = Seq.empty,
exposeExplicitlyUnsupportedResolverFeature: Boolean = false)
@@ -74,7 +75,7 @@ class HybridAnalyzer(
)
private val sampleRateGenerator = new Random()
- def apply(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
+ def apply(plan: LogicalPlan): LogicalPlan = {
val dualRun =
conf.getConf(SQLConf.ANALYZER_DUAL_RUN_LEGACY_AND_SINGLE_PASS_RESOLVER)
&&
checkDualRunSampleRate() &&
@@ -296,7 +297,8 @@ object HybridAnalyzer {
*/
def fromLegacyAnalyzer(
legacyAnalyzer: Analyzer,
- exposeExplicitlyUnsupportedResolverFeature: Boolean = false):
HybridAnalyzer = {
+ exposeExplicitlyUnsupportedResolverFeature: Boolean = false,
+ tracker: QueryPlanningTracker): HybridAnalyzer = {
new HybridAnalyzer(
legacyAnalyzer = legacyAnalyzer,
resolverGuard = new ResolverGuard(legacyAnalyzer.catalogManager),
@@ -307,6 +309,7 @@ object HybridAnalyzer {
metadataResolverExtensions =
legacyAnalyzer.singlePassMetadataResolverExtensions,
externalRelationResolution = Some(legacyAnalyzer.getRelationResolution)
),
+ tracker = tracker,
extendedResolutionChecks =
legacyAnalyzer.singlePassExtendedResolutionChecks,
extendedRewriteRules = legacyAnalyzer.singlePassPostHocResolutionRules,
exposeExplicitlyUnsupportedResolverFeature =
exposeExplicitlyUnsupportedResolverFeature
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala
index 21d9a72fc2a9..1dacec13f8ac 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala
@@ -148,7 +148,8 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
extends HybridAnalyzer(
legacyAnalyzer = legacyAnalyzer,
resolverGuard = resolverGuard,
- resolver = resolver
+ resolver = resolver,
+ tracker = new QueryPlanningTracker
) {
override protected[sql] def normalizePlan(plan: LogicalPlan): LogicalPlan
= {
throw new Exception("Broken plan normalization")
@@ -177,8 +178,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new HybridAnalyzer(
new ValidatingAnalyzer(bridgeRelations = true),
new ResolverGuard(spark.sessionState.catalogManager),
- new ValidatingResolver(bridgeRelations = true)
- ).apply(unresolvedPlan, new QueryPlanningTracker),
+ new ValidatingResolver(bridgeRelations = true),
+ new QueryPlanningTracker
+ ).apply(unresolvedPlan),
resolvedPlan
)
}
@@ -192,8 +194,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new BrokenResolver(
QueryCompilationErrors.unsupportedSinglePassAnalyzerFeature("test"),
bridgeRelations = true
- )
- ).apply(unresolvedPlan, new QueryPlanningTracker)
+ ),
+ new QueryPlanningTracker
+ ).apply(unresolvedPlan)
),
condition = "UNSUPPORTED_SINGLE_PASS_ANALYZER_FEATURE",
parameters = Map("feature" -> "test")
@@ -208,8 +211,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new BrokenResolver(
new StackOverflowError("Stack Overflow"),
bridgeRelations = true
- )
- ).apply(unresolvedPlan, new QueryPlanningTracker)
+ ),
+ new QueryPlanningTracker
+ ).apply(unresolvedPlan)
)
}
@@ -219,8 +223,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new HybridAnalyzer(
new ValidatingAnalyzer(bridgeRelations = true),
new ResolverGuard(spark.sessionState.catalogManager),
- new HardCodedResolver(resolvedPlan, bridgeRelations = true)
- ).apply(malformedUnresolvedPlan, new QueryPlanningTracker)
+ new HardCodedResolver(resolvedPlan, bridgeRelations = true),
+ new QueryPlanningTracker
+ ).apply(malformedUnresolvedPlan)
),
condition =
"HYBRID_ANALYZER_EXCEPTION.FIXED_POINT_FAILED_SINGLE_PASS_SUCCEEDED",
parameters = Map("singlePassOutput" -> resolvedPlan.toString)
@@ -233,8 +238,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new HybridAnalyzer(
new ValidatingAnalyzer(bridgeRelations = true),
new ResolverGuard(spark.sessionState.catalogManager),
- new ValidatingResolver(bridgeRelations = true)
- ).apply(malformedUnresolvedPlan, new QueryPlanningTracker)
+ new ValidatingResolver(bridgeRelations = true),
+ new QueryPlanningTracker
+ ).apply(malformedUnresolvedPlan)
),
condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
parameters = Map(
@@ -250,8 +256,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new HybridAnalyzer(
new ValidatingAnalyzer(bridgeRelations = true),
new ResolverGuard(spark.sessionState.catalogManager),
- new HardCodedResolver(malformedResolvedPlan, bridgeRelations = true)
- ).apply(unresolvedPlan, new QueryPlanningTracker)
+ new HardCodedResolver(malformedResolvedPlan, bridgeRelations = true),
+ new QueryPlanningTracker
+ ).apply(unresolvedPlan)
),
condition = "HYBRID_ANALYZER_EXCEPTION.LOGICAL_PLAN_COMPARISON_MISMATCH",
parameters = Map(
@@ -277,8 +284,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new HybridAnalyzer(
new ValidatingAnalyzer(bridgeRelations = true),
new ResolverGuard(spark.sessionState.catalogManager),
- new HardCodedResolver(resolvedPlan, bridgeRelations = true)
- ).apply(plan, new QueryPlanningTracker)
+ new HardCodedResolver(resolvedPlan, bridgeRelations = true),
+ new QueryPlanningTracker
+ ).apply(plan)
),
condition =
"HYBRID_ANALYZER_EXCEPTION.OUTPUT_SCHEMA_COMPARISON_MISMATCH",
parameters = Map(
@@ -294,7 +302,7 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new ValidatingAnalyzer(bridgeRelations = true),
new ResolverGuard(spark.sessionState.catalogManager),
new HardCodedResolver(resolvedPlan, bridgeRelations = true)
- ).apply(unresolvedPlan, new QueryPlanningTracker)
+ ).apply(unresolvedPlan)
}
}
@@ -306,8 +314,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new BrokenResolver(
new ExplicitlyUnsupportedResolverFeature("FAILURE"),
bridgeRelations = true
- )
- ).apply(unresolvedPlan, new QueryPlanningTracker),
+ ),
+ new QueryPlanningTracker
+ ).apply(unresolvedPlan),
resolvedPlan
)
}
@@ -331,8 +340,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
new BrokenResolver(
new Exception("Single-pass resolver should not be invoked"),
bridgeRelations = false
- )
- ).apply(plan, new QueryPlanningTracker)
+ ),
+ new QueryPlanningTracker
+ ).apply(plan)
},
resolvedPlan
)
@@ -358,8 +368,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
bridgeRelations = false
),
new ResolverGuard(spark.sessionState.catalogManager),
- new ValidatingResolver(bridgeRelations = false)
- ).apply(plan, new QueryPlanningTracker)
+ new ValidatingResolver(bridgeRelations = false),
+ new QueryPlanningTracker
+ ).apply(plan)
},
resolvedPlan
)
@@ -387,8 +398,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
bridgeRelations = false
),
new ResolverGuard(spark.sessionState.catalogManager),
- new ValidatingResolver(bridgeRelations = false)
- ).apply(plan, new QueryPlanningTracker)
+ new ValidatingResolver(bridgeRelations = false),
+ new QueryPlanningTracker
+ ).apply(plan)
},
resolvedPlan
)
@@ -401,8 +413,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
bridgeRelations = true
),
new ResolverGuard(spark.sessionState.catalogManager),
- new ValidatingResolver(bridgeRelations = true)
- ).apply(plan, new QueryPlanningTracker),
+ new ValidatingResolver(bridgeRelations = true),
+ new QueryPlanningTracker
+ ).apply(plan),
resolvedPlan
)
}
@@ -425,8 +438,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
legacyAnalyzer = new ValidatingAnalyzer(bridgeRelations = true),
resolverGuard = new ResolverGuard(spark.sessionState.catalogManager),
resolver = new ValidatingResolver(bridgeRelations = true),
+ tracker = new QueryPlanningTracker,
extendedResolutionChecks = Seq(new BrokenCheckRule)
- ).apply(plan, new QueryPlanningTracker)
+ ).apply(plan)
}
withSQLConf(
@@ -437,8 +451,9 @@ class HybridAnalyzerSuite extends QueryTest with
SharedSparkSession {
legacyAnalyzer = new ValidatingAnalyzer(bridgeRelations = true),
resolverGuard = new ResolverGuard(spark.sessionState.catalogManager),
resolver = new ValidatingResolver(bridgeRelations = true),
+ tracker = new QueryPlanningTracker,
extendedResolutionChecks = Seq(new BrokenCheckRule)
- ).apply(plan, new QueryPlanningTracker),
+ ).apply(plan),
resolvedPlan
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]