This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a3d9ca3 [KYUUBI #1974] Support merge small files in multi insert
statement
a3d9ca3 is described below
commit a3d9ca31d3cbc9ff34484a8ff618e25dbb5b8d3d
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Feb 24 21:10:46 2022 +0800
[KYUUBI #1974] Support merge small files in multi insert statement
### _Why are the changes needed?_
This PR aims to support auto merge small files in multi insert statement,
for example
`FROM VALUES(1) INSERT INTO tmp1 SELECT * INSERT INTO tmp2 SELECT *;`
will generate the following plan, `Union` is the root node instead of
`InsertIntoHiveTable`
```
Union
:- InsertIntoHiveTable
: +- Project
: +- LocalRelation
+- InsertIntoHiveTable
+- Project
+- LocalRelation
```
This PR also fixed the `canInsertRepartitionByExpression`, previous it did
not consider the `SubqueryAlias` which may cause inserting error
`Repartition`/`Reblance` node and currupt the data distribution, e.g.
`FROM (SELECT * FROM VALUES(1) DOSTRIBUTE BY col1) INSERT INTO tmp1 SELECT
* INSERT INTO tmp2 SELECT *;`
```
Union
:- InsertIntoHiveTable
: +- Project
: +- SubqueryAlias
: +- RepartitionByExpression
: +- Project
: +- LocalRelation
+- InsertIntoHiveTable
+- Project
+- SubqueryAlias
+- RepartitionByExpression
+- Project
+- LocalRelation
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1974 from pan3793/ext.
Closes #1974
56cd7734 [Cheng Pan] nit
e0155c27 [Cheng Pan] Support merge small files in multi table insertion
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/sql/RepartitionBeforeWritingSuite.scala | 35 ++++++++++++++++++++--
.../spark/sql/RebalanceBeforeWritingSuite.scala | 28 +++++++++++++++--
.../kyuubi/sql/RepartitionBeforeWritingBase.scala | 7 +++++
3 files changed, 66 insertions(+), 4 deletions(-)
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala
index 8b07f54..f978623 100644
---
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala
@@ -26,14 +26,14 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check repartition exists") {
- def check(df: DataFrame): Unit = {
+ def check(df: DataFrame, expectedRepartitionNum: Int = 1): Unit = {
assert(
df.queryExecution.analyzed.collect {
case r: RepartitionByExpression =>
assert(r.optNumPartitions ===
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
r
- }.size == 1)
+ }.size == expectedRepartitionNum)
}
// It's better to set config explicitly in case of we change the default
value.
@@ -45,6 +45,18 @@ class RepartitionBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
"SELECT * FROM VALUES(1),(2) AS t(c1)"))
}
+ withTable("tmp1", "tmp2") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2
string)")
+ sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2
string)")
+ check(
+ sql(
+ """FROM VALUES(1),(2) AS t(c1)
+ |INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT *
+ |INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT *
+ |""".stripMargin),
+ 2)
+ }
+
withTable("tmp1") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
@@ -52,6 +64,25 @@ class RepartitionBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
"SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"))
}
+ withTable("tmp1", "tmp2") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ sql(s"CREATE TABLE tmp2 (c1 int) $storage")
+ check(
+ sql(
+ """FROM VALUES(1),(2),(3)
+ |INSERT INTO TABLE tmp1 SELECT *
+ |INSERT INTO TABLE tmp2 SELECT *
+ |""".stripMargin),
+ 2)
+ check(
+ sql(
+ """FROM (SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY
c1)
+ |INSERT INTO TABLE tmp1 SELECT *
+ |INSERT INTO TABLE tmp2 SELECT *
+ |""".stripMargin),
+ 2)
+ }
+
withTable("tmp1") {
sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3)
AS t(c1)")
}
diff --git
a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index f40b432..f1a27cd 100644
---
a/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++
b/dev/kyuubi-extension-spark-3-2/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -26,11 +26,11 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check rebalance exists") {
- def check(df: DataFrame): Unit = {
+ def check(df: DataFrame, expectedRebalanceNum: Int = 1): Unit = {
assert(
df.queryExecution.analyzed.collect {
case r: RebalancePartitions => r
- }.size == 1)
+ }.size == expectedRebalanceNum)
}
// It's better to set config explicitly in case of we change the default
value.
@@ -42,11 +42,35 @@ class RebalanceBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
"SELECT * FROM VALUES(1),(2) AS t(c1)"))
}
+ withTable("tmp1", "tmp2") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2
string)")
+ sql(s"CREATE TABLE tmp2 (c1 int) $storage PARTITIONED BY (c2
string)")
+ check(
+ sql(
+ """FROM VALUES(1),(2)
+ |INSERT INTO TABLE tmp1 PARTITION(c2='a') SELECT *
+ |INSERT INTO TABLE tmp2 PARTITION(c2='a') SELECT *
+ |""".stripMargin),
+ 2)
+ }
+
withTable("tmp1") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
}
+ withTable("tmp1", "tmp2") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ sql(s"CREATE TABLE tmp2 (c1 int) $storage")
+ check(
+ sql(
+ """FROM VALUES(1),(2),(3)
+ |INSERT INTO TABLE tmp1 SELECT *
+ |INSERT INTO TABLE tmp2 SELECT *
+ |""".stripMargin),
+ 2)
+ }
+
withTable("tmp1") {
sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3)
AS t(c1)")
}
diff --git
a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index b987a72..33aff09 100644
---
a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++
b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -59,6 +59,9 @@ abstract class RepartitionBeforeWritingDatasourceBase extends
RepartitionBuilder
query.output.filter(attr =>
table.partitionColumnNames.contains(attr.name))
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
+ case u @ Union(children, _, _) =>
+ u.copy(children = children.map(addRepartition))
+
case _ => plan
}
}
@@ -98,6 +101,9 @@ abstract class RepartitionBeforeWritingHiveBase extends
RepartitionBuilder {
query.output.filter(attr =>
table.partitionColumnNames.contains(attr.name))
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
+ case u @ Union(children, _, _) =>
+ u.copy(children = children.map(addRepartition))
+
case _ => plan
}
}
@@ -105,6 +111,7 @@ abstract class RepartitionBeforeWritingHiveBase extends
RepartitionBuilder {
trait RepartitionBeforeWriteHelper {
def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = plan
match {
case Project(_, child) => canInsertRepartitionByExpression(child)
+ case SubqueryAlias(_, child) => canInsertRepartitionByExpression(child)
case Limit(_, _) => false
case _: Sort => false
case _: RepartitionByExpression => false