This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch branch-1.10 in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.10 by this push: new 7af96688f7 [KYUUBI #7139] Fix Spark extension rules to support RebalancePartitions 7af96688f7 is described below commit 7af96688f705060be47658dcfafad50168a4c8a9 Author: Cheng Pan <cheng...@apache.org> AuthorDate: Fri Jul 18 11:46:36 2025 +0800 [KYUUBI #7139] Fix Spark extension rules to support RebalancePartitions As title. UT are modified. No. Closes #7139 from pan3793/rebalance. Closes #7139 edb070afd [Cheng Pan] fix 4d3984a92 [Cheng Pan] Fix Spark extension rules to support RebalancePartitions Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Cheng Pan <cheng...@apache.org> (cherry picked from commit 5f4b1f0de513ad1085dcea8ab3d4911a82733b9e) Signed-off-by: Cheng Pan <cheng...@apache.org> --- .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 9 --------- .../kyuubi/sql/RepartitionBeforeWritingBase.scala | 5 +++-- .../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala | 6 +++--- .../apache/spark/sql/RebalanceBeforeWritingSuite.scala | 17 ++++++++++++++--- .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 9 --------- .../kyuubi/sql/RepartitionBeforeWritingBase.scala | 5 +++-- .../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala | 6 +++--- .../apache/spark/sql/RebalanceBeforeWritingSuite.scala | 17 ++++++++++++++--- .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 9 --------- .../kyuubi/sql/RepartitionBeforeWritingBase.scala | 5 +++-- .../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala | 6 +++--- .../apache/spark/sql/RebalanceBeforeWritingSuite.scala | 17 ++++++++++++++--- 12 files changed, 60 insertions(+), 51 deletions(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala index 3cbacdd2f0..2c6980fdb6 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala @@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends RepartitionBuilder { } } } - - override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = { - super.canInsertRepartitionByExpression(plan) && { - plan match { - case _: RebalancePartitions => false - case _ => true - } - } - } } /** diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala index 95f3529e29..5b825cc6ce 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala @@ -118,6 +118,7 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] { case _: Window => true case s: Sort if s.global => true case _: RepartitionOperation => true + case _: RebalancePartitions => true case _: GlobalLimit => true case _ => false }.isDefined @@ -131,8 +132,8 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] { case SubqueryAlias(_, child) => canInsert(child) case Limit(_, _) => false case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false + case _: RepartitionOperation => false + case _: RebalancePartitions => false case _ => true } diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala index 4b2494bc84..e03e49b7d8 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala @@ -50,12 +50,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { def canInsertZorder(query: LogicalPlan): Boolean = query match { case Project(_, child) => canInsertZorder(child) - case _: RepartitionByExpression | _: Repartition + case _: RepartitionOperation | _: RebalancePartitions if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true // TODO: actually, we can force zorder even if existed some shuffle case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false + case _: RepartitionOperation => false + case _: RebalancePartitions => false case _ => true } diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala index c1295ca04a..0826d385d3 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala @@ -28,18 +28,21 @@ import org.apache.kyuubi.sql.KyuubiSQLConf class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("check rebalance exists") { - def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = { + def check( + df: => DataFrame, + expectedRebalanceNumEnabled: Int = 1, + expectedRebalanceNumDisabled: Int = 0): Unit = { withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { assert( df.queryExecution.analyzed.collect { case r: RebalancePartitions => r - }.size == expectedRebalanceNum) + }.size == expectedRebalanceNumEnabled) } withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "false") { assert( df.queryExecution.analyzed.collect { case r: RebalancePartitions => r - }.isEmpty) + }.size == expectedRebalanceNumDisabled) } } @@ -69,6 +72,14 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)")) } + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage") + check( + sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM VALUES(1),(2),(3) AS t(c1)"), + 1, + 1) + } + withTable("tmp1", "tmp2") { sql(s"CREATE TABLE tmp1 (c1 int) $storage") sql(s"CREATE TABLE tmp2 (c1 int) $storage") diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala index 3cbacdd2f0..2c6980fdb6 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala @@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends RepartitionBuilder { } } } - - override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = { - super.canInsertRepartitionByExpression(plan) && { - plan match { - case _: RebalancePartitions => false - case _ => true - } - } - } } /** diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala index 3ebb9740f5..e15c5a4580 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala @@ -99,6 +99,7 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] { case _: Window => true case s: Sort if s.global => true case _: RepartitionOperation => true + case _: RebalancePartitions => true case _: GlobalLimit => true case _ => false }.isDefined @@ -112,8 +113,8 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] { case SubqueryAlias(_, child) => canInsert(child) case Limit(_, _) => false case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false + case _: RepartitionOperation => false + case _: RebalancePartitions => false case _ => true } diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala index 003ba6b68a..d96b546b94 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala @@ -49,12 +49,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { def canInsertZorder(query: LogicalPlan): Boolean = query match { case Project(_, child) => canInsertZorder(child) - case _: RepartitionByExpression | _: Repartition + case _: RepartitionOperation | _: RebalancePartitions if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true // TODO: actually, we can force zorder even if existed some shuffle case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false + case _: RepartitionOperation => false + case _: RebalancePartitions => false case _ => true } diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala index f739634958..761fec37d9 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala @@ -29,19 +29,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("check rebalance exists") { - def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = { + def check( + df: => DataFrame, + expectedRebalanceNumEnabled: Int = 1, + expectedRebalanceNumDisabled: Int = 0): Unit = { withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { withListener(df) { write => assert(write.collect { case r: RebalancePartitions => r - }.size == expectedRebalanceNum) + }.size == expectedRebalanceNumEnabled) } } withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "false") { withListener(df) { write => assert(write.collect { case r: RebalancePartitions => r - }.isEmpty) + }.size == expectedRebalanceNumDisabled) } } } @@ -72,6 +75,14 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)")) } + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage") + check( + sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM VALUES(1),(2),(3) AS t(c1)"), + 1, + 1) + } + withTable("tmp1", "tmp2") { sql(s"CREATE TABLE tmp1 (c1 int) $storage") sql(s"CREATE TABLE tmp2 (c1 int) $storage") diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala index 3cbacdd2f0..2c6980fdb6 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala @@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends RepartitionBuilder { } } } - - override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = { - super.canInsertRepartitionByExpression(plan) && { - plan match { - case _: RebalancePartitions => false - case _ => true - } - } - } } /** diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala index 3ebb9740f5..e15c5a4580 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala @@ -99,6 +99,7 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] { case _: Window => true case s: Sort if s.global => true case _: RepartitionOperation => true + case _: RebalancePartitions => true case _: GlobalLimit => true case _ => false }.isDefined @@ -112,8 +113,8 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] { case SubqueryAlias(_, child) => canInsert(child) case Limit(_, _) => false case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false + case _: RepartitionOperation => false + case _: RebalancePartitions => false case _ => true } diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala index 003ba6b68a..d96b546b94 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala @@ -49,12 +49,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { def canInsertZorder(query: LogicalPlan): Boolean = query match { case Project(_, child) => canInsertZorder(child) - case _: RepartitionByExpression | _: Repartition + case _: RepartitionOperation | _: RebalancePartitions if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true // TODO: actually, we can force zorder even if existed some shuffle case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false + case _: RepartitionOperation => false + case _: RebalancePartitions => false case _ => true } diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala index 46ba272011..d63e79996b 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala @@ -29,19 +29,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { test("check rebalance exists") { - def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = { + def check( + df: => DataFrame, + expectedRebalanceNumEnabled: Int = 1, + expectedRebalanceNumDisabled: Int = 0): Unit = { withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") { withListener(df) { write => assert(write.collect { case r: RebalancePartitions => r - }.size == expectedRebalanceNum) + }.size == expectedRebalanceNumEnabled) } } withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "false") { withListener(df) { write => assert(write.collect { case r: RebalancePartitions => r - }.isEmpty) + }.size == expectedRebalanceNumDisabled) } } } @@ -72,6 +75,14 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest { check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS t(c1)")) } + withTable("tmp1") { + sql(s"CREATE TABLE tmp1 (c1 int) $storage") + check( + sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM VALUES(1),(2),(3) AS t(c1)"), + 1, + 1) + } + withTable("tmp1", "tmp2") { sql(s"CREATE TABLE tmp1 (c1 int) $storage") sql(s"CREATE TABLE tmp2 (c1 int) $storage")