This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2080c2186c [KYUUBI #6990] Add rebalance before 
InsertIntoHiveDirCommand and InsertIntoDataSourceDirCommand to align with 
behaviors of hive
2080c2186c is described below

commit 2080c2186cb12a8e6474e8ea9544c9d7fe943b9c
Author: wuziyi <[email protected]>
AuthorDate: Tue Mar 25 00:52:55 2025 +0800

    [KYUUBI #6990] Add rebalance before InsertIntoHiveDirCommand and 
InsertIntoDataSourceDirCommand to align with behaviors of hive
    
    ### Why are the changes needed?
    
    When users switch from Hive to Spark, for sql like INSERT OVERWRITE 
DIRECTORY AS SELECT, it would be great if small files could be automatically 
merged through simple configuration, just like in Hive.
    
    ### How was this patch tested?
    
    UnitTest
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #6991 from Z1Wu/feat/add_insert_dir_rebalance_support.
    
    Closes #6990
    
    2820bb2d2 [wuziyi] [fix] nit
    a69c04191 [wuziyi] [fix] nit
    951a7738f [wuziyi] [fix] nit
    f75dfcb3a [wuziyi] [Feat] add rebalance before InsertIntoHiveDirCommand and 
InsertIntoDataSourceDirCommand to align with behaviors of hive
    
    Authored-by: wuziyi <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/sql/RepartitionBeforeWritingBase.scala  | 12 +++++--
 .../spark/sql/RebalanceBeforeWritingSuite.scala    | 42 ++++++++++++++++++++--
 .../kyuubi/sql/RepartitionBeforeWritingBase.scala  | 11 +++++-
 .../spark/sql/RebalanceBeforeWritingSuite.scala    | 42 ++++++++++++++++++++--
 .../apache/kyuubi/sql/RebalanceBeforeWriting.scala | 11 +++++-
 .../spark/sql/RebalanceBeforeWritingSuite.scala    | 41 ++++++++++++++++++++-
 6 files changed, 150 insertions(+), 9 deletions(-)

diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index 95f3529e29..ace98bb260 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -20,9 +20,9 @@ package org.apache.kyuubi.sql
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
+import 
org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, 
InsertIntoDataSourceDirCommand}
 import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, 
InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, 
InsertIntoHiveDirCommand, InsertIntoHiveTable, 
OptimizedCreateHiveTableAsSelectCommand}
 import org.apache.spark.sql.internal.StaticSQLConf
 
 trait RepartitionBuilder extends Rule[LogicalPlan] with 
RepartitionBeforeWriteHelper {
@@ -59,6 +59,10 @@ abstract class RepartitionBeforeWritingDatasourceBase 
extends RepartitionBuilder
         query.output.filter(attr => 
table.partitionColumnNames.contains(attr.name))
       c.copy(query = buildRepartition(dynamicPartitionColumns, query))
 
+    case i @ InsertIntoDataSourceDirCommand(_, _, query, _)
+        if query.resolved && canInsertRepartitionByExpression(query) =>
+      i.copy(query = buildRepartition(Seq.empty, query))
+
     case u @ Union(children, _, _) =>
       u.copy(children = children.map(addRepartition))
 
@@ -101,6 +105,10 @@ abstract class RepartitionBeforeWritingHiveBase extends 
RepartitionBuilder {
         query.output.filter(attr => 
table.partitionColumnNames.contains(attr.name))
       c.copy(query = buildRepartition(dynamicPartitionColumns, query))
 
+    case c @ InsertIntoHiveDirCommand(_, _, query, _, _)
+        if query.resolved && canInsertRepartitionByExpression(query) =>
+      c.copy(query = buildRepartition(Seq.empty, query))
+
     case u @ Union(children, _, _) =>
       u.copy(children = children.map(addRepartition))
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index c1295ca04a..2c75e476b1 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
RebalancePartitions, Sort}
-import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.execution.command.{DataWritingCommand, 
InsertIntoDataSourceDirCommand}
 import org.apache.spark.sql.hive.HiveUtils
-import 
org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
+import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, 
OptimizedCreateHiveTableAsSelectCommand}
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
 
@@ -272,4 +272,42 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
       }
     }
   }
+
+  test("Test rebalance in InsertIntoHiveDirCommand") {
+    withSQLConf(
+      HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+      HiveUtils.CONVERT_METASTORE_CTAS.key -> "false",
+      KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
+      withTempDir(tmpDir => {
+        spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
+        val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
+          s"STORED AS PARQUET SELECT * FROM tmp_table")
+        val insertHiveDirCommand = df.queryExecution.analyzed.collect {
+          case _: InsertIntoHiveDirCommand => true
+        }
+        assert(insertHiveDirCommand.size == 1)
+        val repartition = df.queryExecution.analyzed.collect {
+          case _: RebalancePartitions => true
+        }
+        assert(repartition.size == 1)
+      })
+    }
+  }
+
+  test("Test rebalance in InsertIntoDataSourceDirCommand") {
+    withSQLConf(
+      KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
+      withTempDir(tmpDir => {
+        spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
+        val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
+          s"USING PARQUET SELECT * FROM tmp_table")
+        
assert(df.queryExecution.analyzed.isInstanceOf[InsertIntoDataSourceDirCommand])
+        val repartition =
+          
df.queryExecution.analyzed.asInstanceOf[InsertIntoDataSourceDirCommand].query.collect
 {
+            case _: RebalancePartitions => true
+          }
+        assert(repartition.size == 1)
+      })
+    }
+  }
 }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index 3ebb9740f5..e549d457af 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -20,8 +20,9 @@ package org.apache.kyuubi.sql
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
 import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
+import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, 
InsertIntoHiveTable}
 import org.apache.spark.sql.internal.StaticSQLConf
 
 trait RepartitionBuilder extends Rule[LogicalPlan] with 
RepartitionBeforeWriteHelper {
@@ -52,6 +53,10 @@ abstract class RepartitionBeforeWritingDatasourceBase 
extends RepartitionBuilder
       val dynamicPartitionColumns = pc.filterNot(attr => 
sp.contains(attr.name))
       i.copy(query = buildRepartition(dynamicPartitionColumns, query))
 
+    case i @ InsertIntoDataSourceDirCommand(_, _, query, _)
+        if query.resolved && canInsertRepartitionByExpression(query) =>
+      i.copy(query = buildRepartition(Seq.empty, query))
+
     case u @ Union(children, _, _) =>
       u.copy(children = children.map(addRepartition))
 
@@ -82,6 +87,10 @@ abstract class RepartitionBeforeWritingHiveBase extends 
RepartitionBuilder {
         .flatMap(name => query.output.find(_.name == name)).toSeq
       i.copy(query = buildRepartition(dynamicPartitionColumns, query))
 
+    case i @ InsertIntoHiveDirCommand(_, _, query, _, _)
+        if query.resolved && canInsertRepartitionByExpression(query) =>
+      i.copy(query = buildRepartition(Seq.empty, query))
+
     case u @ Union(children, _, _) =>
       u.copy(children = children.map(addRepartition))
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index f739634958..9e9328fd96 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
RebalancePartitions, Sort}
-import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.execution.command.{DataWritingCommand, 
InsertIntoDataSourceDirCommand}
 import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
 import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
+import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, 
InsertIntoHiveTable}
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
 
@@ -267,4 +267,42 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
       }
     }
   }
+
+  test("Test rebalance in InsertIntoHiveDirCommand") {
+    withSQLConf(
+      HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+      HiveUtils.CONVERT_METASTORE_CTAS.key -> "false",
+      KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
+      withTempDir(tmpDir => {
+        spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
+        val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
+          s"STORED AS PARQUET SELECT * FROM tmp_table")
+        val insertHiveDirCommand = df.queryExecution.analyzed.collect {
+          case _: InsertIntoHiveDirCommand => true
+        }
+        assert(insertHiveDirCommand.size == 1)
+        val repartition = df.queryExecution.analyzed.collect {
+          case _: RebalancePartitions => true
+        }
+        assert(repartition.size == 1)
+      })
+    }
+  }
+
+  test("Test rebalance in InsertIntoDataSourceDirCommand") {
+    withSQLConf(
+      KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
+      withTempDir(tmpDir => {
+        spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
+        val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
+          s"USING PARQUET SELECT * FROM tmp_table")
+        
assert(df.queryExecution.analyzed.isInstanceOf[InsertIntoDataSourceDirCommand])
+        val repartition =
+          
df.queryExecution.analyzed.asInstanceOf[InsertIntoDataSourceDirCommand].query.collect
 {
+            case _: RebalancePartitions => true
+          }
+        assert(repartition.size == 1)
+      })
+    }
+  }
 }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 38e9bbb12f..25ec7240d1 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -23,8 +23,9 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
 import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
+import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, 
InsertIntoHiveTable}
 import org.apache.spark.sql.internal.StaticSQLConf
 
 trait RebalanceBeforeWritingBase extends Rule[LogicalPlan] {
@@ -112,6 +113,10 @@ case class RebalanceBeforeWritingDatasource(session: 
SparkSession)
       val dynamicPartitionColumns = pc.filterNot(attr => 
sp.contains(attr.name))
       i.copy(query = buildRebalance(dynamicPartitionColumns, query))
 
+    case i @ InsertIntoDataSourceDirCommand(_, _, query, _)
+        if query.resolved && canInsertRebalance(query) =>
+      i.copy(query = buildRebalance(Seq.empty, query))
+
     case u @ Union(children, _, _) =>
       u.copy(children = children.map(addRebalance))
 
@@ -144,6 +149,10 @@ case class RebalanceBeforeWritingHive(session: 
SparkSession)
         .flatMap(name => query.output.find(_.name == name)).toSeq
       i.copy(query = buildRebalance(dynamicPartitionColumns, query))
 
+    case i @ InsertIntoHiveDirCommand(_, _, query, _, _)
+        if query.resolved && canInsertRebalance(query) =>
+      i.copy(query = buildRebalance(Seq.empty, query))
+
     case u @ Union(children, _, _) =>
       u.copy(children = children.map(addRebalance))
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 46ba272011..6fb63b0f1e 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
RebalancePartitions, Sort}
 import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
 import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
 import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
+import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, 
InsertIntoHiveTable}
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
 
@@ -292,4 +293,42 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
       }
     }
   }
+
+  test("Test rebalance in InsertIntoHiveDirCommand") {
+    withSQLConf(
+      HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+      HiveUtils.CONVERT_METASTORE_CTAS.key -> "false",
+      KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
+      withTempDir(tmpDir => {
+        spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
+        val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
+          s"STORED AS PARQUET SELECT * FROM tmp_table")
+        val insertHiveDirCommand = df.queryExecution.analyzed.collect {
+          case _: InsertIntoHiveDirCommand => true
+        }
+        assert(insertHiveDirCommand.size == 1)
+        val repartition = df.queryExecution.analyzed.collect {
+          case _: RebalancePartitions => true
+        }
+        assert(repartition.size == 1)
+      })
+    }
+  }
+
+  test("Test rebalance in InsertIntoDataSourceDirCommand") {
+    withSQLConf(
+      KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
+      withTempDir(tmpDir => {
+        spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
+        val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
+          s"USING PARQUET SELECT * FROM tmp_table")
+        
assert(df.queryExecution.analyzed.isInstanceOf[InsertIntoDataSourceDirCommand])
+        val repartition =
+          
df.queryExecution.analyzed.asInstanceOf[InsertIntoDataSourceDirCommand].query.collect
 {
+            case _: RebalancePartitions => true
+          }
+        assert(repartition.size == 1)
+      })
+    }
+  }
 }

Reply via email to