This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 609bd4839e5d [SPARK-47572][SQL] Enforce Window partitionSpec is
orderable
609bd4839e5d is described below
commit 609bd4839e5d504917de74ed1cb9c23645fba51f
Author: Chenhao Li <[email protected]>
AuthorDate: Fri Mar 29 16:18:46 2024 +0800
[SPARK-47572][SQL] Enforce Window partitionSpec is orderable
### What changes were proposed in this pull request?
In the `Window` node, both `partitionSpec` and `orderSpec` must be
orderable, but the current type check only verifies `orderSpec` is orderable.
This can cause an error in later optimizing phases.
Given a query:
```
with t as (select id, map(id, id) as m from range(0, 10))
select rank() over (partition by m order by id) from t
```
Before the PR, it fails with an `INTERNAL_ERROR`:
```
org.apache.spark.SparkException: [INTERNAL_ERROR] grouping/join/window
partition keys cannot be map type. SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
at
org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.needNormalize(NormalizeFloatingNumbers.scala:103)
at
org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.org$apache$spark$sql$catalyst$optimizer$NormalizeFloatingNumbers$$needNormalize(NormalizeFloatingNumbers.scala:94)
...
```
After the PR, it fails with a `EXPRESSION_TYPE_IS_NOT_ORDERABLE`, which is
expected:
```
org.apache.spark.sql.catalyst.ExtendedAnalysisException:
[EXPRESSION_TYPE_IS_NOT_ORDERABLE] Column expression "m" cannot be sorted
because its type "MAP<BIGINT, BIGINT>" is not orderable. SQLSTATE: 42822; line
2 pos 53;
Project [RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4]
+- Project [id#1L, m#0, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS
FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4, RANK() OVER
(PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW)#4]
+- Window [rank(id#1L) windowspecdefinition(m#0, id#1L ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK()
OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW)#4], [m#0], [id#1L ASC NULLS FIRST]
+- Project [id#1L, m#0]
+- SubqueryAlias t
+- SubqueryAlias t
+- Project [id#1L, map(id#1L, id#1L) AS m#0]
+- Range (0, 10, step=1, splits=None)
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
...
```
### How was this patch tested?
Unit test.
Closes #45730 from chenhao-db/SPARK-47572.
Authored-by: Chenhao Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 13 +++++++++++++
.../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 14 ++++++++++++++
2 files changed, 27 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 4a979fd214ab..10bff5e6e59a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -537,6 +537,19 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
}
}
+ case Window(_, partitionSpec, _, _) =>
+ // Both `partitionSpec` and `orderSpec` must be orderable. We only
need an extra check
+ // for `partitionSpec` here because `orderSpec` has the type check
itself.
+ partitionSpec.foreach { p =>
+ if (!RowOrdering.isOrderable(p.dataType)) {
+ p.failAnalysis(
+ errorClass = "EXPRESSION_TYPE_IS_NOT_ORDERABLE",
+ messageParameters = Map(
+ "expr" -> toSQLExpr(p),
+ "exprType" -> toSQLType(p.dataType)))
+ }
+ }
+
case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit",
limitExpr)
case LocalLimit(limitExpr, child) =>
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 5a3008187271..f12d22409691 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -1316,4 +1316,18 @@ class AnalysisErrorSuite extends AnalysisTest with
DataTypeErrorsBase {
)
}
}
+
+ errorClassTest(
+ "SPARK-47572: Enforce Window partitionSpec is orderable",
+ testRelation2.select(
+ WindowExpression(
+ new Rank(),
+ WindowSpecDefinition(
+ CreateMap(Literal("key") :: UnresolvedAttribute("a") :: Nil) :: Nil,
+ SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil,
+ UnspecifiedFrame)).as("window")),
+ errorClass = "EXPRESSION_TYPE_IS_NOT_ORDERABLE",
+ messageParameters = Map(
+ "expr" -> "\"_w0\"",
+ "exprType" -> "\"MAP<STRING, STRING>\""))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]