This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 609bd4839e5d [SPARK-47572][SQL] Enforce Window partitionSpec is 
orderable
609bd4839e5d is described below

commit 609bd4839e5d504917de74ed1cb9c23645fba51f
Author: Chenhao Li <[email protected]>
AuthorDate: Fri Mar 29 16:18:46 2024 +0800

    [SPARK-47572][SQL] Enforce Window partitionSpec is orderable
    
    ### What changes were proposed in this pull request?
    
    In the `Window` node, both `partitionSpec` and `orderSpec` must be 
orderable, but the current type check only verifies `orderSpec` is orderable. 
This can cause an error in later optimizing phases.
    
    Given a query:
    
    ```
    with t as (select id, map(id, id) as m from range(0, 10))
    select rank() over (partition by m order by id) from t
    ```
    
    Before the PR, it fails with an `INTERNAL_ERROR`:
    
    ```
    org.apache.spark.SparkException: [INTERNAL_ERROR] grouping/join/window 
partition keys cannot be map type. SQLSTATE: XX000
    at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
    at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
    at 
org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.needNormalize(NormalizeFloatingNumbers.scala:103)
    at 
org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.org$apache$spark$sql$catalyst$optimizer$NormalizeFloatingNumbers$$needNormalize(NormalizeFloatingNumbers.scala:94)
    ...
    ```
    
    After the PR, it fails with a `EXPRESSION_TYPE_IS_NOT_ORDERABLE`, which is 
expected:
    
    ```
      org.apache.spark.sql.catalyst.ExtendedAnalysisException: 
[EXPRESSION_TYPE_IS_NOT_ORDERABLE] Column expression "m" cannot be sorted 
because its type "MAP<BIGINT, BIGINT>" is not orderable. SQLSTATE: 42822; line 
2 pos 53;
    Project [RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4]
    +- Project [id#1L, m#0, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS 
FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4, RANK() OVER 
(PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW)#4]
       +- Window [rank(id#1L) windowspecdefinition(m#0, id#1L ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() 
OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW)#4], [m#0], [id#1L ASC NULLS FIRST]
          +- Project [id#1L, m#0]
             +- SubqueryAlias t
                +- SubqueryAlias t
                   +- Project [id#1L, map(id#1L, id#1L) AS m#0]
                      +- Range (0, 10, step=1, splits=None)
      at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
    ...
    ```
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #45730 from chenhao-db/SPARK-47572.
    
    Authored-by: Chenhao Li <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 13 +++++++++++++
 .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala   | 14 ++++++++++++++
 2 files changed, 27 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 4a979fd214ab..10bff5e6e59a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -537,6 +537,19 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
               }
             }
 
+          case Window(_, partitionSpec, _, _) =>
+            // Both `partitionSpec` and `orderSpec` must be orderable. We only 
need an extra check
+            // for `partitionSpec` here because `orderSpec` has the type check 
itself.
+            partitionSpec.foreach { p =>
+              if (!RowOrdering.isOrderable(p.dataType)) {
+                p.failAnalysis(
+                  errorClass = "EXPRESSION_TYPE_IS_NOT_ORDERABLE",
+                  messageParameters = Map(
+                    "expr" -> toSQLExpr(p),
+                    "exprType" -> toSQLType(p.dataType)))
+              }
+            }
+
           case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", 
limitExpr)
 
           case LocalLimit(limitExpr, child) =>
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 5a3008187271..f12d22409691 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -1316,4 +1316,18 @@ class AnalysisErrorSuite extends AnalysisTest with 
DataTypeErrorsBase {
       )
     }
   }
+
+  errorClassTest(
+    "SPARK-47572: Enforce Window partitionSpec is orderable",
+    testRelation2.select(
+      WindowExpression(
+        new Rank(),
+        WindowSpecDefinition(
+          CreateMap(Literal("key") :: UnresolvedAttribute("a") :: Nil) :: Nil,
+          SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil,
+          UnspecifiedFrame)).as("window")),
+    errorClass = "EXPRESSION_TYPE_IS_NOT_ORDERABLE",
+    messageParameters = Map(
+      "expr" -> "\"_w0\"",
+      "exprType" -> "\"MAP<STRING, STRING>\""))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to