This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f9c105e1b69 [SPARK-42832][SQL] Remove repartition if it is the child
of LocalLimit
f9c105e1b69 is described below
commit f9c105e1b693760d8c904066fdb65630aa4aeb91
Author: Yuming Wang <[email protected]>
AuthorDate: Wed Mar 22 10:31:45 2023 -0700
[SPARK-42832][SQL] Remove repartition if it is the child of LocalLimit
### What changes were proposed in this pull request?
This PR enhances `CollapseRepartition` to remove repartition if it is the
child of `LocalLimit`. Because its output is determined by the number of
partitions and the expressions of the Repartition. Therefore, it is feasible to
remove Repartition except for repartition by nondeterministic expressions,
because users may expect to randomly take data.
For example:
```sql
SELECT /*+ REBALANCE */ * FROM t WHERE id > 1 LIMIT 5;
```
Before this PR:
```
== Optimized Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
+- RebalancePartitions
+- Filter (isnotnull(id#0L) AND (id#0L > 1))
+- Relation spark_catalog.default.t[id#0L] parquet
```
After this PR:
```
== Optimized Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
+- Filter (isnotnull(id#0L) AND (id#0L > 1))
+- Relation spark_catalog.default.t[id#0L] parquet
```
Note that we don't remove repartition if it looks like the user might want
to take data randomly. For example:
```sql
SELECT /*+ REPARTITION(3) */ * FROM t WHERE id > 1 LIMIT 5;
SELECT * FROM t WHERE id > 1 DISTRIBUTE BY random() LIMIT 5;
```
### Why are the changes needed?
Reduce shuffle to improve query performance. The use case is that we add a
repartition to improve the parallelism on a JDBC table:
<img
src="https://user-images.githubusercontent.com/5399861/225855582-c3c81c7d-4617-4104-b669-76749a7468a0.png"
width="400" height="700">
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #40462 from wangyum/SPARK-42832.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 9 +++++++++
.../catalyst/optimizer/CollapseRepartitionSuite.scala | 18 ++++++++++++++++++
2 files changed, 27 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 13d1ee31a22..3e8571f3eb6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1212,6 +1212,15 @@ object CollapseRepartition extends Rule[LogicalPlan] {
// child.
case r @ RebalancePartitions(_, child: RebalancePartitions, _, _) =>
r.withNewChildren(child.children)
+ // Case 5: When a LocalLimit has a child of Repartition we can remove the
Repartition.
+ // Because its output is determined by the number of partitions and the
expressions of the
+ // Repartition. Therefore, it is feasible to remove Repartition except for
repartition by
+ // nondeterministic expressions, because users may expect to randomly take
data.
+ case l @ LocalLimit(_, r: RepartitionByExpression)
+ if r.partitionExpressions.nonEmpty &&
r.partitionExpressions.forall(_.deterministic) =>
+ l.copy(child = r.child)
+ case l @ LocalLimit(_, r: RebalancePartitions) =>
+ l.copy(child = r.child)
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
index f9eb6d2e760..7c8a90fe23e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Uuid
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -222,4 +223,21 @@ class CollapseRepartitionSuite extends PlanTest {
comparePlans(optimized, expected)
}
}
+
+ test("SPARK-42832: Remove repartition if it is the child of LocalLimit") {
+ Seq(testRelation.distribute($"a")(2),
+ testRelation.rebalance(),
+ testRelation.rebalance($"a")).foreach { repartition =>
+ comparePlans(
+ Optimize.execute(repartition.limit(3).analyze),
+ testRelation.limit(3).analyze)
+ }
+
+ // In this case, do not remove repartition, the user may want to randomly
take data.
+ Seq(testRelation.distribute()(2),
+ testRelation.distribute(Uuid())(2)).foreach { repartition =>
+ val plan = repartition.limit(3).analyze
+ comparePlans( Optimize.execute(plan), plan)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]