This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new fdacd2396 [KYUUBI #5580] Support generate and window operators for
InferRebalanceAndSortOrders
fdacd2396 is described below
commit fdacd239647a37c3ce993c517575b3c254a33681
Author: wforget <[email protected]>
AuthorDate: Mon Nov 6 10:10:36 2023 +0800
[KYUUBI #5580] Support generate and window operators for
InferRebalanceAndSortOrders
### _Why are the changes needed?_
Support `generate` and `window` operators for `InferRebalanceAndSortOrders`.
### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5580 from wForget/dev.
Closes #5580
1a3bfadd2 [wforget] fix
be74aacd3 [wforget] fix
587c493df [wforget] Support generate and window operators for
InferRebalanceAndSortOrders
Authored-by: wforget <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
---
.../kyuubi/sql/InferRebalanceAndSortOrders.scala | 8 ++++++-
.../spark/sql/RebalanceBeforeWritingSuite.scala | 27 +++++++++++++++++++++-
2 files changed, 33 insertions(+), 2 deletions(-)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
index fcbf5c0a1..3b840f2a0 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
@@ -22,7 +22,7 @@ import scala.annotation.tailrec
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeSet, Expression, NamedExpression, UnaryExpression}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti,
LeftOuter, LeftSemi, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter,
LogicalPlan, Project, Sort, SubqueryAlias, View}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter,
Generate, LogicalPlan, Project, Sort, SubqueryAlias, View, Window}
/**
* Infer the columns for Rebalance and Sort to improve the compression ratio.
@@ -96,6 +96,12 @@ object InferRebalanceAndSortOrders {
case f: Filter => candidateKeys(f.child, output)
case s: SubqueryAlias => candidateKeys(s.child, output)
case v: View => candidateKeys(v.child, output)
+ case g: Generate => candidateKeys(g.child,
AttributeSet(g.requiredChildOutput))
+ case w: Window =>
+ val aliasMap = getAliasMap(w.windowExpressions)
+ Some((
+ w.partitionSpec.map(p => aliasMap.getOrElse(p.canonicalized, p)),
+ w.orderSpec.map(_.child).map(o =>
aliasMap.getOrElse(o.canonicalized, o))))
case _ => None
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 1d9630f49..64e44abc0 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -199,9 +199,10 @@ class RebalanceBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
}
withView("v") {
- withTable("t", "input1", "input2") {
+ withTable("t", "t2", "input1", "input2") {
withSQLConf(KyuubiSQLConf.INFER_REBALANCE_AND_SORT_ORDERS.key ->
"true") {
sql(s"CREATE TABLE t (c1 int, c2 long) USING PARQUET PARTITIONED BY
(p string)")
+ sql(s"CREATE TABLE t2 (c1 int, c2 long, c3 long) USING PARQUET
PARTITIONED BY (p string)")
sql(s"CREATE TABLE input1 USING PARQUET AS SELECT * FROM
VALUES(1,2),(1,3)")
sql(s"CREATE TABLE input2 USING PARQUET AS SELECT * FROM
VALUES(1,3),(1,3)")
sql(s"CREATE VIEW v as SELECT col1, count(*) as col2 FROM input1
GROUP BY col1")
@@ -264,6 +265,30 @@ class RebalanceBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
|SELECT * FROM v
|""".stripMargin)
checkShuffleAndSort(df5.queryExecution.analyzed, 1, 1)
+
+ // generate
+ val df6 = sql(
+ s"""
+ |INSERT INTO TABLE t2 PARTITION(p='a')
+ |SELECT /*+ broadcast(input2) */ input1.col1, input2.col1,
cast(cc.action1 as bigint)
+ |FROM input1
+ |JOIN input2
+ |ON input1.col1 = input2.col1
+ | lateral view explode(ARRAY(input1.col1, input1.col2)) cc as
action1
+ |""".stripMargin)
+ checkShuffleAndSort(df6.queryExecution.analyzed, 1, 1)
+
+ // window
+ val df7 = sql(
+ s"""
+ |INSERT INTO TABLE t2 PARTITION(p='a')
+ |SELECT /*+ broadcast(input2) */ input1.col1, input2.col2,
+ | RANK() OVER (PARTITION BY input2.col2 ORDER BY input1.col1)
AS rank
+ |FROM input1
+ |JOIN input2
+ |ON input1.col1 = input2.col1
+ |""".stripMargin)
+ checkShuffleAndSort(df7.queryExecution.analyzed, 1, 1)
}
}
}