This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new fdacd2396 [KYUUBI #5580] Support generate and window operators for 
InferRebalanceAndSortOrders
fdacd2396 is described below

commit fdacd239647a37c3ce993c517575b3c254a33681
Author: wforget <[email protected]>
AuthorDate: Mon Nov 6 10:10:36 2023 +0800

    [KYUUBI #5580] Support generate and window operators for 
InferRebalanceAndSortOrders
    
    ### _Why are the changes needed?_
    
    Support `generate` and `window` operators for `InferRebalanceAndSortOrders`.
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No
    
    Closes #5580 from wForget/dev.
    
    Closes #5580
    
    1a3bfadd2 [wforget] fix
    be74aacd3 [wforget] fix
    587c493df [wforget] Support generate and window operators for 
InferRebalanceAndSortOrders
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: ulyssesyou <[email protected]>
---
 .../kyuubi/sql/InferRebalanceAndSortOrders.scala   |  8 ++++++-
 .../spark/sql/RebalanceBeforeWritingSuite.scala    | 27 +++++++++++++++++++++-
 2 files changed, 33 insertions(+), 2 deletions(-)

diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
index fcbf5c0a1..3b840f2a0 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
@@ -22,7 +22,7 @@ import scala.annotation.tailrec
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeSet, Expression, NamedExpression, UnaryExpression}
 import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
 import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, 
LeftOuter, LeftSemi, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LogicalPlan, Project, Sort, SubqueryAlias, View}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
Generate, LogicalPlan, Project, Sort, SubqueryAlias, View, Window}
 
 /**
  * Infer the columns for Rebalance and Sort to improve the compression ratio.
@@ -96,6 +96,12 @@ object InferRebalanceAndSortOrders {
         case f: Filter => candidateKeys(f.child, output)
         case s: SubqueryAlias => candidateKeys(s.child, output)
         case v: View => candidateKeys(v.child, output)
+        case g: Generate => candidateKeys(g.child, 
AttributeSet(g.requiredChildOutput))
+        case w: Window =>
+          val aliasMap = getAliasMap(w.windowExpressions)
+          Some((
+            w.partitionSpec.map(p => aliasMap.getOrElse(p.canonicalized, p)),
+            w.orderSpec.map(_.child).map(o => 
aliasMap.getOrElse(o.canonicalized, o))))
 
         case _ => None
       }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 1d9630f49..64e44abc0 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -199,9 +199,10 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
     }
 
     withView("v") {
-      withTable("t", "input1", "input2") {
+      withTable("t", "t2", "input1", "input2") {
         withSQLConf(KyuubiSQLConf.INFER_REBALANCE_AND_SORT_ORDERS.key -> 
"true") {
           sql(s"CREATE TABLE t (c1 int, c2 long) USING PARQUET PARTITIONED BY 
(p string)")
+          sql(s"CREATE TABLE t2 (c1 int, c2 long, c3 long) USING PARQUET 
PARTITIONED BY (p string)")
           sql(s"CREATE TABLE input1 USING PARQUET AS SELECT * FROM 
VALUES(1,2),(1,3)")
           sql(s"CREATE TABLE input2 USING PARQUET AS SELECT * FROM 
VALUES(1,3),(1,3)")
           sql(s"CREATE VIEW v as SELECT col1, count(*) as col2 FROM input1 
GROUP BY col1")
@@ -264,6 +265,30 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
                |SELECT * FROM v
                |""".stripMargin)
           checkShuffleAndSort(df5.queryExecution.analyzed, 1, 1)
+
+          // generate
+          val df6 = sql(
+            s"""
+               |INSERT INTO TABLE t2 PARTITION(p='a')
+               |SELECT /*+ broadcast(input2) */ input1.col1, input2.col1, 
cast(cc.action1 as bigint)
+               |FROM input1
+               |JOIN input2
+               |ON input1.col1 = input2.col1
+               |  lateral view explode(ARRAY(input1.col1, input1.col2)) cc as 
action1
+               |""".stripMargin)
+          checkShuffleAndSort(df6.queryExecution.analyzed, 1, 1)
+
+          // window
+          val df7 = sql(
+            s"""
+               |INSERT INTO TABLE t2 PARTITION(p='a')
+               |SELECT /*+ broadcast(input2) */ input1.col1, input2.col2,
+               | RANK() OVER (PARTITION BY input2.col2 ORDER BY input1.col1) 
AS rank
+               |FROM input1
+               |JOIN input2
+               |ON input1.col1 = input2.col1
+               |""".stripMargin)
+          checkShuffleAndSort(df7.queryExecution.analyzed, 1, 1)
         }
       }
     }

Reply via email to