This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new a3d9ca3  [KYUUBI #1974] Support merge small files in multi insert 
statement
a3d9ca3 is described below

commit a3d9ca31d3cbc9ff34484a8ff618e25dbb5b8d3d
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Feb 24 21:10:46 2022 +0800

    [KYUUBI #1974] Support merge small files in multi insert statement
    
    ### _Why are the changes needed?_
    
    This PR aims to support auto merge small files in multi insert statement, 
for example
    
    `FROM VALUES(1) INSERT INTO tmp1 SELECT * INSERT INTO tmp2 SELECT *;`
    
    will generate the following plan, `Union` is the root node instead of 
`InsertIntoHiveTable`
    
    ```
    Union
    :- InsertIntoHiveTable
    :  +- Project
    :    +- LocalRelation
    +- InsertIntoHiveTable
       +- Project
         +- LocalRelation
    ```
    
    This PR also fixed the `canInsertRepartitionByExpression`, previous it did 
not consider the `SubqueryAlias` which may cause inserting error 
`Repartition`/`Reblance` node and currupt the data distribution, e.g.
    
    `FROM (SELECT * FROM VALUES(1) DOSTRIBUTE BY col1) INSERT INTO tmp1 SELECT 
* INSERT INTO tmp2 SELECT *;`
    
    ```
    Union
    :- InsertIntoHiveTable
    :  +- Project
    :     +- SubqueryAlias
    :        +- RepartitionByExpression
    :           +- Project
    :              +- LocalRelation
    +- InsertIntoHiveTable
       +- Project
          +- SubqueryAlias
             +- RepartitionByExpression
                +- Project
                   +- LocalRelation
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1974 from pan3793/ext.
    
    Closes #1974
    
    56cd7734 [Cheng Pan] nit
    e0155c27 [Cheng Pan] Support merge small files in multi table insertion
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../spark/sql/RepartitionBeforeWritingSuite.scala  | 35 ++++++++++++++++++++--
 .../spark/sql/RebalanceBeforeWritingSuite.scala    | 28 +++++++++++++++--
 .../kyuubi/sql/RepartitionBeforeWritingBase.scala  |  7 +++++
 3 files changed, 66 insertions(+), 4 deletions(-)

diff --git 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala
index 8b07f54..f978623 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala
@@ -26,14 +26,14 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
 
 class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
   test("check repartition exists") {
-    def check(df: DataFrame): Unit = {
+    def check(df: DataFrame, expectedRepartitionNum: Int = 1): Unit = {
       assert(
         df.queryExecution.analyzed.collect {
           case r: RepartitionByExpression =>
             assert(r.optNumPartitions ===
               
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
             r
-        }.size == 1)
+        }.size == expectedRepartitionNum)
     }
 
     // It's better to set config explicitly in case of we change the default 
value.
@@ -45,6 +45,18 @@ class RepartitionBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
             "SELECT * FROM VALUES(1),(2) AS t(c1)"))
         }
 
+        withTable("tmp1", "tmp2") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 
string)")
+          sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2 
string)")
+          check(
+            sql(
+              """FROM VALUES(1),(2) AS t(c1)
+                |INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT *
+                |INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT *
+                |""".stripMargin),
+            2)
+        }
+
         withTable("tmp1") {
           sql(s"CREATE TABLE tmp1 (c1 int) $storage")
           check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
@@ -52,6 +64,25 @@ class RepartitionBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
             "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"))
         }
 
+        withTable("tmp1", "tmp2") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          sql(s"CREATE TABLE tmp2 (c1 int) $storage")
+          check(
+            sql(
+              """FROM VALUES(1),(2),(3)
+                |INSERT INTO TABLE tmp1 SELECT *
+                |INSERT INTO TABLE tmp2 SELECT *
+                |""".stripMargin),
+            2)
+          check(
+            sql(
+              """FROM (SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY 
c1)
+                |INSERT INTO TABLE tmp1 SELECT *
+                |INSERT INTO TABLE tmp2 SELECT *
+                |""".stripMargin),
+            2)
+        }
+
         withTable("tmp1") {
           sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) 
AS t(c1)")
         }
diff --git 
a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index f40b432..f1a27cd 100644
--- 
a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -26,11 +26,11 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
 
 class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
   test("check rebalance exists") {
-    def check(df: DataFrame): Unit = {
+    def check(df: DataFrame, expectedRebalanceNum: Int = 1): Unit = {
       assert(
         df.queryExecution.analyzed.collect {
           case r: RebalancePartitions => r
-        }.size == 1)
+        }.size == expectedRebalanceNum)
     }
 
     // It's better to set config explicitly in case of we change the default 
value.
@@ -42,11 +42,35 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
             "SELECT * FROM VALUES(1),(2) AS t(c1)"))
         }
 
+        withTable("tmp1", "tmp2") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 
string)")
+          sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2 
string)")
+          check(
+            sql(
+              """FROM VALUES(1),(2)
+                |INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT *
+                |INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT *
+                |""".stripMargin),
+            2)
+        }
+
         withTable("tmp1") {
           sql(s"CREATE TABLE tmp1 (c1 int) $storage")
           check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
         }
 
+        withTable("tmp1", "tmp2") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          sql(s"CREATE TABLE tmp2 (c1 int) $storage")
+          check(
+            sql(
+              """FROM VALUES(1),(2),(3)
+                |INSERT INTO TABLE tmp1 SELECT *
+                |INSERT INTO TABLE tmp2 SELECT *
+                |""".stripMargin),
+            2)
+        }
+
         withTable("tmp1") {
           sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) 
AS t(c1)")
         }
diff --git 
a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
 
b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index b987a72..33aff09 100644
--- 
a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++ 
b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -59,6 +59,9 @@ abstract class RepartitionBeforeWritingDatasourceBase extends 
RepartitionBuilder
         query.output.filter(attr => 
table.partitionColumnNames.contains(attr.name))
       c.copy(query = buildRepartition(dynamicPartitionColumns, query))
 
+    case u @ Union(children, _, _) =>
+      u.copy(children = children.map(addRepartition))
+
     case _ => plan
   }
 }
@@ -98,6 +101,9 @@ abstract class RepartitionBeforeWritingHiveBase extends 
RepartitionBuilder {
         query.output.filter(attr => 
table.partitionColumnNames.contains(attr.name))
       c.copy(query = buildRepartition(dynamicPartitionColumns, query))
 
+    case u @ Union(children, _, _) =>
+      u.copy(children = children.map(addRepartition))
+
     case _ => plan
   }
 }
@@ -105,6 +111,7 @@ abstract class RepartitionBeforeWritingHiveBase extends 
RepartitionBuilder {
 trait RepartitionBeforeWriteHelper {
   def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = plan 
match {
     case Project(_, child) => canInsertRepartitionByExpression(child)
+    case SubqueryAlias(_, child) => canInsertRepartitionByExpression(child)
     case Limit(_, _) => false
     case _: Sort => false
     case _: RepartitionByExpression => false

Reply via email to