This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 09cb059c8dde [SPARK-50798][SQL][FOLLOWUP] Further improvements to
`NormalizePlan`
09cb059c8dde is described below
commit 09cb059c8dde9765d7ac6e4f1b44e2f47b3c1338
Author: Mihailo Timotic <[email protected]>
AuthorDate: Wed Jan 22 10:22:11 2025 +0800
[SPARK-50798][SQL][FOLLOWUP] Further improvements to `NormalizePlan`
### What changes were proposed in this pull request?
Improve `NormalizePlan` by fixing normalization of `InheritAnalysisRules`
and add normalization for `CommonExpressionId` and expressions that use it.
### Why are the changes needed?
PR #49460 mishandled normalization of `RuntimeReplaceable` because it
copied original expression's tags to the replacement. This PR fixes that.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a test case to `NormalizePlanSuite`.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49585 from mihailotim-db/mihailotim-db/normalize_plan_followup.
Authored-by: Mihailo Timotic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/plans/NormalizePlan.scala | 34 +++++++++++++++-------
.../sql/catalyst/plans/NormalizePlanSuite.scala | 32 +++++++++++++++++++-
2 files changed, 54 insertions(+), 12 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
index 13df749c6d58..b98cef04d911 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
@@ -22,17 +22,34 @@ import java.util.HashMap
import org.apache.spark.sql.catalyst.analysis.GetViewColumnByNameAndOrdinal
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions
import org.apache.spark.sql.catalyst.plans.logical._
object NormalizePlan extends PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = {
- val withNormalizedInheritAnalysis = normalizeInheritAnalysisRules(plan)
- val withNormalizedExprIds = normalizeExprIds(withNormalizedInheritAnalysis)
+ val withNormalizedExpressions = normalizeExpressions(plan)
+ val withNormalizedExprIds = normalizeExprIds(withNormalizedExpressions)
normalizePlan(withNormalizedExprIds)
}
/**
- * Normalize [[InheritAnalysisRules]] nodes by replacing them with their
replacement expressions.
+ * Normalizes expressions in a plan, that either produces non-deterministic
results or
+ * will be different between fixed-point and single-pass analyzer, due to
the nature
+ * of bottom-up resolution. Before normalization, pre-process the plan by
replacing all
+ * [[RuntimeReplaceable]] nodes with their replacements.
+ */
+ def normalizeExpressions(plan: LogicalPlan): LogicalPlan = {
+ val withNormalizedRuntimeReplaceable = normalizeRuntimeReplaceable(plan)
+ withNormalizedRuntimeReplaceable transformAllExpressions {
+ case c: CommonExpressionDef =>
+ c.copy(id = new CommonExpressionId(id = 0))
+ case c: CommonExpressionRef =>
+ c.copy(id = new CommonExpressionId(id = 0))
+ }
+ }
+
+ /**
+ * Normalize [[RuntimeReplaceable]] nodes by replacing them with their
replacement expressions.
* This is necessary because fixed-point analyzer may produce
non-deterministic results when
* resolving original expressions. For example, in a query like:
*
@@ -44,15 +61,10 @@ object NormalizePlan extends PredicateHelper {
* child of initially unresolved function is resolved, the function can be
converted to
* [[AssertTrue]], which is of type [[InheritAnalysisRules]]. However,
because the only child of
* [[InheritAnalysisRules]] is the replacement expression, the original
expression will be lost
- * timezone will never be applied. This causes inconsistencies, because
fixed-point semantic is
- * to ALWAYS apply timezone, regardless of whether or not the Cast actually
needs it.
+ * and timezone will never be applied. This causes inconsistencies, because
fixed-point semantic
+ * is to ALWAYS apply timezone, regardless of whether the Cast actually
needs it.
*/
- def normalizeInheritAnalysisRules(plan: LogicalPlan): LogicalPlan = {
- plan transformAllExpressions {
- case inheritAnalysisRules: InheritAnalysisRules =>
- inheritAnalysisRules.child
- }
- }
+ def normalizeRuntimeReplaceable(plan: LogicalPlan): LogicalPlan =
ReplaceExpressions(plan)
/**
* Since attribute references are given globally unique ids during analysis,
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
index 5ff66098107c..aa2a6408faf0 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
@@ -20,7 +20,16 @@ package org.apache.spark.sql.catalyst.plans
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{AssertTrue, Cast, If,
Literal, TimeZoneAwareExpression}
+import org.apache.spark.sql.catalyst.expressions.{
+ AssertTrue,
+ Cast,
+ CommonExpressionDef,
+ CommonExpressionId,
+ CommonExpressionRef,
+ If,
+ Literal,
+ TimeZoneAwareExpression
+}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.types.BooleanType
@@ -70,6 +79,27 @@ class NormalizePlanSuite extends SparkFunSuite with
SQLConfHelper {
assert(NormalizePlan(resolvedBaselinePlan) ==
NormalizePlan(resolvedTestPlan))
}
+ test("Normalize CommonExpressionId") {
+ val baselineCommonExpressionRef =
+ CommonExpressionRef(id = new CommonExpressionId, dataType = BooleanType,
nullable = false)
+ val baselineCommonExpressionDef = CommonExpressionDef(child = Literal(0))
+ val testCommonExpressionRef =
+ CommonExpressionRef(id = new CommonExpressionId, dataType = BooleanType,
nullable = false)
+ val testCommonExpressionDef = CommonExpressionDef(child = Literal(0))
+
+ val baselinePlanRef = LocalRelation().select(baselineCommonExpressionRef)
+ val testPlanRef = LocalRelation().select(testCommonExpressionRef)
+
+ assert(baselinePlanRef != testPlanRef)
+ assert(NormalizePlan(baselinePlanRef) == NormalizePlan(testPlanRef))
+
+ val baselinePlanDef = LocalRelation().select(baselineCommonExpressionDef)
+ val testPlanDef = LocalRelation().select(testCommonExpressionDef)
+
+ assert(baselinePlanDef != testPlanDef)
+ assert(NormalizePlan(baselinePlanDef) == NormalizePlan(testPlanDef))
+ }
+
private def setTimezoneForAllExpression(plan: LogicalPlan): LogicalPlan = {
plan.transformAllExpressions {
case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]