This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fe70ca4cf9e0 [SPARK-50228][SQL] Move the `RewriteCollationJoin` rule 
to `FinishAnalysis`
fe70ca4cf9e0 is described below

commit fe70ca4cf9e0111e5f0616cc8eeffc2008c4b1bc
Author: Max Gekk <[email protected]>
AuthorDate: Wed Nov 6 08:56:59 2024 +0100

    [SPARK-50228][SQL] Move the `RewriteCollationJoin` rule to `FinishAnalysis`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to move the `RewriteCollationJoin` rule added by 
https://github.com/apache/spark/pull/46599 to `FinishAnalysis`.
    
    ### Why are the changes needed?
    The conversions of join keys that the rule does **should not be** 
considered as an optimization, but it is needed for correctness. So, we shall 
not run it as an optimization rule, it should be applied before any. Currently, 
optimization rules can produce plans (for instance, sub-queries) that become 
semantically incorrect after `RewriteCollationJoin`.
    
    We could modify `RewriteCollationJoin` and take into account DPP and maybe 
other optimizations, but might miss something else. Fxing **all** results of 
optimizations in `RewriteCollationJoin` introduces some unnecessary 
dependencies.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By existing GAs. It is improssible to test wrong results of DPP because 
non-binary collated strings as partitions have not been supported by any 
built-in datasources yet.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #48759 from MaxGekk/dpp-collation-bug.
    
    Authored-by: Max Gekk <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala   | 3 ++-
 .../main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala  | 4 +---
 sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala   | 6 +++---
 3 files changed, 6 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 7074f8c4f089..76a0e90f5eb2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -305,7 +305,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
       SpecialDatetimeValues,
       RewriteAsOfJoin,
       EvalInlineTables,
-      ReplaceTranspose
+      ReplaceTranspose,
+      RewriteCollationJoin
     )
 
     override def apply(plan: LogicalPlan): LogicalPlan = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 3382a1161ddb..6173703ef3cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.ExperimentalMethods
-import org.apache.spark.sql.catalyst.analysis.RewriteCollationJoin
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.optimizer._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -93,8 +92,7 @@ class SparkOptimizer(
       EliminateLimits,
       ConstantFolding) :+
     Batch("User Provided Optimizers", fixedPoint, 
experimentalMethods.extraOptimizations: _*) :+
-    Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition) 
:+
-    Batch("RewriteCollationJoin", Once, RewriteCollationJoin)
+    Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition)
 
   override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+
     ExtractPythonUDFFromJoinCondition.ruleName :+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
index d69ba77a1475..7d7c95a24ca6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
@@ -1834,7 +1834,7 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
     })
   }
 
-  test("rewrite with collationkey should be an excludable rule") {
+  test("rewrite with collationkey should be a non-excludable rule") {
     val t1 = "T_1"
     val t2 = "T_2"
     val collation = "UTF8_LCASE"
@@ -1856,12 +1856,12 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
         assert(
           collectFirst(queryPlan) {
             case _: HashJoin => ()
-          }.isEmpty
+          }.nonEmpty
         )
         assert(
           collectFirst(queryPlan) {
             case _: SortMergeJoinExec => ()
-          }.nonEmpty
+          }.isEmpty
         )
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to