This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.10 by this push:
     new 7af96688f7 [KYUUBI #7139] Fix Spark extension rules to support 
RebalancePartitions
7af96688f7 is described below

commit 7af96688f705060be47658dcfafad50168a4c8a9
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Fri Jul 18 11:46:36 2025 +0800

    [KYUUBI #7139] Fix Spark extension rules to support RebalancePartitions
    
    As title.
    
    UT are modified.
    
    No.
    
    Closes #7139 from pan3793/rebalance.
    
    Closes #7139
    
    edb070afd [Cheng Pan] fix
    4d3984a92 [Cheng Pan] Fix Spark extension rules to support 
RebalancePartitions
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Cheng Pan <cheng...@apache.org>
    (cherry picked from commit 5f4b1f0de513ad1085dcea8ab3d4911a82733b9e)
    Signed-off-by: Cheng Pan <cheng...@apache.org>
---
 .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala  |  9 ---------
 .../kyuubi/sql/RepartitionBeforeWritingBase.scala       |  5 +++--
 .../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala   |  6 +++---
 .../apache/spark/sql/RebalanceBeforeWritingSuite.scala  | 17 ++++++++++++++---
 .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala  |  9 ---------
 .../kyuubi/sql/RepartitionBeforeWritingBase.scala       |  5 +++--
 .../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala   |  6 +++---
 .../apache/spark/sql/RebalanceBeforeWritingSuite.scala  | 17 ++++++++++++++---
 .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala  |  9 ---------
 .../kyuubi/sql/RepartitionBeforeWritingBase.scala       |  5 +++--
 .../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala   |  6 +++---
 .../apache/spark/sql/RebalanceBeforeWritingSuite.scala  | 17 ++++++++++++++---
 12 files changed, 60 insertions(+), 51 deletions(-)

diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends 
RepartitionBuilder {
       }
     }
   }
-
-  override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
-    super.canInsertRepartitionByExpression(plan) && {
-      plan match {
-        case _: RebalancePartitions => false
-        case _ => true
-      }
-    }
-  }
 }
 
 /**
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index 95f3529e29..5b825cc6ce 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -118,6 +118,7 @@ trait RepartitionBeforeWriteHelper extends 
Rule[LogicalPlan] {
       case _: Window => true
       case s: Sort if s.global => true
       case _: RepartitionOperation => true
+      case _: RebalancePartitions => true
       case _: GlobalLimit => true
       case _ => false
     }.isDefined
@@ -131,8 +132,8 @@ trait RepartitionBeforeWriteHelper extends 
Rule[LogicalPlan] {
       case SubqueryAlias(_, child) => canInsert(child)
       case Limit(_, _) => false
       case _: Sort => false
-      case _: RepartitionByExpression => false
-      case _: Repartition => false
+      case _: RepartitionOperation => false
+      case _: RebalancePartitions => false
       case _ => true
     }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 4b2494bc84..e03e49b7d8 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -50,12 +50,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with 
ZorderBuilder {
 
   def canInsertZorder(query: LogicalPlan): Boolean = query match {
     case Project(_, child) => canInsertZorder(child)
-    case _: RepartitionByExpression | _: Repartition
+    case _: RepartitionOperation | _: RebalancePartitions
         if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
     // TODO: actually, we can force zorder even if existed some shuffle
     case _: Sort => false
-    case _: RepartitionByExpression => false
-    case _: Repartition => false
+    case _: RepartitionOperation => false
+    case _: RebalancePartitions => false
     case _ => true
   }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index c1295ca04a..0826d385d3 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -28,18 +28,21 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
 class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
 
   test("check rebalance exists") {
-    def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+    def check(
+        df: => DataFrame,
+        expectedRebalanceNumEnabled: Int = 1,
+        expectedRebalanceNumDisabled: Int = 0): Unit = {
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
         assert(
           df.queryExecution.analyzed.collect {
             case r: RebalancePartitions => r
-          }.size == expectedRebalanceNum)
+          }.size == expectedRebalanceNumEnabled)
       }
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"false") {
         assert(
           df.queryExecution.analyzed.collect {
             case r: RebalancePartitions => r
-          }.isEmpty)
+          }.size == expectedRebalanceNumDisabled)
       }
     }
 
@@ -69,6 +72,14 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
           check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
         }
 
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          check(
+            sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM 
VALUES(1),(2),(3) AS t(c1)"),
+            1,
+            1)
+        }
+
         withTable("tmp1", "tmp2") {
           sql(s"CREATE TABLE tmp1 (c1 int) $storage")
           sql(s"CREATE TABLE tmp2 (c1 int) $storage")
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends 
RepartitionBuilder {
       }
     }
   }
-
-  override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
-    super.canInsertRepartitionByExpression(plan) && {
-      plan match {
-        case _: RebalancePartitions => false
-        case _ => true
-      }
-    }
-  }
 }
 
 /**
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index 3ebb9740f5..e15c5a4580 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -99,6 +99,7 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] {
       case _: Window => true
       case s: Sort if s.global => true
       case _: RepartitionOperation => true
+      case _: RebalancePartitions => true
       case _: GlobalLimit => true
       case _ => false
     }.isDefined
@@ -112,8 +113,8 @@ trait RepartitionBeforeWriteHelper extends 
Rule[LogicalPlan] {
       case SubqueryAlias(_, child) => canInsert(child)
       case Limit(_, _) => false
       case _: Sort => false
-      case _: RepartitionByExpression => false
-      case _: Repartition => false
+      case _: RepartitionOperation => false
+      case _: RebalancePartitions => false
       case _ => true
     }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 003ba6b68a..d96b546b94 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -49,12 +49,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with 
ZorderBuilder {
 
   def canInsertZorder(query: LogicalPlan): Boolean = query match {
     case Project(_, child) => canInsertZorder(child)
-    case _: RepartitionByExpression | _: Repartition
+    case _: RepartitionOperation | _: RebalancePartitions
         if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
     // TODO: actually, we can force zorder even if existed some shuffle
     case _: Sort => false
-    case _: RepartitionByExpression => false
-    case _: Repartition => false
+    case _: RepartitionOperation => false
+    case _: RebalancePartitions => false
     case _ => true
   }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index f739634958..761fec37d9 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -29,19 +29,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
 class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
 
   test("check rebalance exists") {
-    def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+    def check(
+        df: => DataFrame,
+        expectedRebalanceNumEnabled: Int = 1,
+        expectedRebalanceNumDisabled: Int = 0): Unit = {
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
         withListener(df) { write =>
           assert(write.collect {
             case r: RebalancePartitions => r
-          }.size == expectedRebalanceNum)
+          }.size == expectedRebalanceNumEnabled)
         }
       }
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"false") {
         withListener(df) { write =>
           assert(write.collect {
             case r: RebalancePartitions => r
-          }.isEmpty)
+          }.size == expectedRebalanceNumDisabled)
         }
       }
     }
@@ -72,6 +75,14 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
           check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
         }
 
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          check(
+            sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM 
VALUES(1),(2),(3) AS t(c1)"),
+            1,
+            1)
+        }
+
         withTable("tmp1", "tmp2") {
           sql(s"CREATE TABLE tmp1 (c1 int) $storage")
           sql(s"CREATE TABLE tmp2 (c1 int) $storage")
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends 
RepartitionBuilder {
       }
     }
   }
-
-  override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
-    super.canInsertRepartitionByExpression(plan) && {
-      plan match {
-        case _: RebalancePartitions => false
-        case _ => true
-      }
-    }
-  }
 }
 
 /**
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index 3ebb9740f5..e15c5a4580 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -99,6 +99,7 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] {
       case _: Window => true
       case s: Sort if s.global => true
       case _: RepartitionOperation => true
+      case _: RebalancePartitions => true
       case _: GlobalLimit => true
       case _ => false
     }.isDefined
@@ -112,8 +113,8 @@ trait RepartitionBeforeWriteHelper extends 
Rule[LogicalPlan] {
       case SubqueryAlias(_, child) => canInsert(child)
       case Limit(_, _) => false
       case _: Sort => false
-      case _: RepartitionByExpression => false
-      case _: Repartition => false
+      case _: RepartitionOperation => false
+      case _: RebalancePartitions => false
       case _ => true
     }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 003ba6b68a..d96b546b94 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -49,12 +49,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with 
ZorderBuilder {
 
   def canInsertZorder(query: LogicalPlan): Boolean = query match {
     case Project(_, child) => canInsertZorder(child)
-    case _: RepartitionByExpression | _: Repartition
+    case _: RepartitionOperation | _: RebalancePartitions
         if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
     // TODO: actually, we can force zorder even if existed some shuffle
     case _: Sort => false
-    case _: RepartitionByExpression => false
-    case _: Repartition => false
+    case _: RepartitionOperation => false
+    case _: RebalancePartitions => false
     case _ => true
   }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 46ba272011..d63e79996b 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -29,19 +29,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
 class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
 
   test("check rebalance exists") {
-    def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+    def check(
+        df: => DataFrame,
+        expectedRebalanceNumEnabled: Int = 1,
+        expectedRebalanceNumDisabled: Int = 0): Unit = {
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
         withListener(df) { write =>
           assert(write.collect {
             case r: RebalancePartitions => r
-          }.size == expectedRebalanceNum)
+          }.size == expectedRebalanceNumEnabled)
         }
       }
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"false") {
         withListener(df) { write =>
           assert(write.collect {
             case r: RebalancePartitions => r
-          }.isEmpty)
+          }.size == expectedRebalanceNumDisabled)
         }
       }
     }
@@ -72,6 +75,14 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
           check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
         }
 
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          check(
+            sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM 
VALUES(1),(2),(3) AS t(c1)"),
+            1,
+            1)
+        }
+
         withTable("tmp1", "tmp2") {
           sql(s"CREATE TABLE tmp1 (c1 int) $storage")
           sql(s"CREATE TABLE tmp2 (c1 int) $storage")

Reply via email to