This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0aca1a6 [SPARK-32276][SQL] Remove redundant sorts before repartition
nodes
0aca1a6 is described below
commit 0aca1a6ed4cd7d0e5d231a40e7f3cc0331a3776a
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sun Jul 19 12:11:26 2020 -0700
[SPARK-32276][SQL] Remove redundant sorts before repartition nodes
### What changes were proposed in this pull request?
This PR proposes to remove redundant sorts before repartition nodes
whenever the data is ordered after the repartitioning.
### Why are the changes needed?
It looks like our `EliminateSorts` rule can be extended further to remove
sorts before repartition nodes that don't affect the final output ordering. It
seems safe to perform the following rewrites:
- `Sort -> Repartition -> Sort -> Scan` as `Sort -> Repartition -> Scan`
- `Sort -> Repartition -> Project -> Sort -> Scan` as `Sort -> Repartition
-> Project -> Scan`
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
More test cases.
Closes #29089 from aokolnychyi/spark-32276.
Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 19 +-
.../EliminateSortsBeforeRepartitionSuite.scala | 193 +++++++++++++++++++++
2 files changed, 206 insertions(+), 6 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 33da482..a0e21ed 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -966,14 +966,19 @@ object CombineFilters extends Rule[LogicalPlan] with
PredicateHelper {
}
/**
- * Removes Sort operation. This can happen:
+ * Removes Sort operations if they don't affect the final output ordering.
+ * Note that changes in the final output ordering may affect the file size
(SPARK-32318).
+ * This rule handles the following cases:
* 1) if the sort order is empty or the sort order does not have any reference
* 2) if the child is already sorted
- * 3) if there is another Sort operator separated by 0...n Project/Filter
operators
- * 4) if the Sort operator is within Join separated by 0...n Project/Filter
operators only,
- * and the Join conditions is deterministic
- * 5) if the Sort operator is within GroupBy separated by 0...n Project/Filter
operators only,
- * and the aggregate function is order irrelevant
+ * 3) if there is another Sort operator separated by 0...n Project, Filter,
Repartition or
+ * RepartitionByExpression (with deterministic expressions) operators
+ * 4) if the Sort operator is within Join separated by 0...n Project, Filter,
Repartition or
+ * RepartitionByExpression (with deterministic expressions) operators only
and the Join condition
+ * is deterministic
+ * 5) if the Sort operator is within GroupBy separated by 0...n Project,
Filter, Repartition or
+ * RepartitionByExpression (with deterministic expressions) operators only
and the aggregate
+ * function is order irrelevant
*/
object EliminateSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -999,6 +1004,8 @@ object EliminateSorts extends Rule[LogicalPlan] {
private def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
case p: Project => p.projectList.forall(_.deterministic)
case f: Filter => f.condition.deterministic
+ case r: RepartitionByExpression =>
r.partitionExpressions.forall(_.deterministic)
+ case _: Repartition => true
case _ => false
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsBeforeRepartitionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsBeforeRepartitionSuite.scala
new file mode 100644
index 0000000..9f03135
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsBeforeRepartitionSuite.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class EliminateSortsBeforeRepartitionSuite extends PlanTest {
+
+ val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry,
conf)
+ val analyzer = new Analyzer(catalog, conf)
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+ val anotherTestRelation = LocalRelation('d.int, 'e.int)
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Default", FixedPoint(10),
+ FoldablePropagation,
+ LimitPushDown) ::
+ Batch("Eliminate Sorts", Once,
+ EliminateSorts) ::
+ Batch("Collapse Project", Once,
+ CollapseProject) :: Nil
+ }
+
+ def repartition(plan: LogicalPlan): LogicalPlan = plan.repartition(10)
+
+ test("sortBy") {
+ val plan = testRelation.select('a, 'b).sortBy('a.asc, 'b.desc)
+ val optimizedPlan = testRelation.select('a, 'b)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("sortBy with projection") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).select('a + 1 as "a", 'b +
2 as "b")
+ val optimizedPlan = testRelation.select('a + 1 as "a", 'b + 2 as "b")
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("sortBy with projection and filter") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).select('a, 'b).where('a ===
10)
+ val optimizedPlan = testRelation.select('a, 'b).where('a === 10)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("sortBy with limit") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).limit(10)
+ val optimizedPlan = testRelation.sortBy('a.asc, 'b.asc).limit(10)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("sortBy with non-deterministic projection") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).select(rand(1), 'a, 'b)
+ val optimizedPlan = testRelation.sortBy('a.asc, 'b.asc).select(rand(1),
'a, 'b)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("orderBy") {
+ val plan = testRelation.select('a, 'b).orderBy('a.asc, 'b.asc)
+ val optimizedPlan = testRelation.select('a, 'b)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("orderBy with projection") {
+ val plan = testRelation.orderBy('a.asc, 'b.asc).select('a + 1 as "a", 'b +
2 as "b")
+ val optimizedPlan = testRelation.select('a + 1 as "a", 'b + 2 as "b")
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("orderBy with projection and filter") {
+ val plan = testRelation.orderBy('a.asc, 'b.asc).select('a, 'b).where('a
=== 10)
+ val optimizedPlan = testRelation.select('a, 'b).where('a === 10)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("orderBy with limit") {
+ val plan = testRelation.orderBy('a.asc, 'b.asc).limit(10)
+ val optimizedPlan = testRelation.orderBy('a.asc, 'b.asc).limit(10)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("orderBy with non-deterministic projection") {
+ val plan = testRelation.orderBy('a.asc, 'b.asc).select(rand(1), 'a, 'b)
+ val optimizedPlan = testRelation.orderBy('a.asc, 'b.asc).select(rand(1),
'a, 'b)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("additional coalesce and sortBy") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).coalesce(1)
+ val optimizedPlan = testRelation.coalesce(1)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("additional projection, repartition and sortBy") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).repartition(100).select('a
+ 1 as "a")
+ val optimizedPlan = testRelation.repartition(100).select('a + 1 as "a")
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("additional filter, distribute and sortBy") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).distribute('a)(2).where('a
=== 10)
+ val optimizedPlan = testRelation.distribute('a)(2).where('a === 10)
+ checkRepartitionCases(plan, optimizedPlan)
+ }
+
+ test("join") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).distribute('a)(2).where('a
=== 10)
+ val optimizedPlan = testRelation.distribute('a)(2).where('a === 10)
+ val anotherPlan = anotherTestRelation.select('d)
+ val joinPlan = plan.join(anotherPlan)
+ val optimizedJoinPlan = optimize(joinPlan)
+ val correctJoinPlan = analyze(optimizedPlan.join(anotherPlan))
+ comparePlans(optimizedJoinPlan, correctJoinPlan)
+ }
+
+ test("aggregate") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).distribute('a)(2).where('a
=== 10)
+ val optimizedPlan = testRelation.distribute('a)(2).where('a === 10)
+ val aggPlan = plan.groupBy('a)(sum('b))
+ val optimizedAggPlan = optimize(aggPlan)
+ val correctAggPlan = analyze(optimizedPlan.groupBy('a)(sum('b)))
+ comparePlans(optimizedAggPlan, correctAggPlan)
+ }
+
+ protected def checkRepartitionCases(plan: LogicalPlan, optimizedPlan:
LogicalPlan): Unit = {
+ // cannot remove sortBy before repartition without sortBy/orderBy
+ val planWithRepartition = repartition(plan)
+ val optimizedPlanWithRepartition = optimize(planWithRepartition)
+ val correctPlanWithRepartition = analyze(planWithRepartition)
+ comparePlans(optimizedPlanWithRepartition, correctPlanWithRepartition)
+
+ // can remove sortBy before repartition with sortBy
+ val planWithRepartitionAndSortBy = planWithRepartition.sortBy('a.asc)
+ val optimizedPlanWithRepartitionAndSortBy =
optimize(planWithRepartitionAndSortBy)
+ val correctPlanWithRepartitionAndSortBy =
analyze(repartition(optimizedPlan).sortBy('a.asc))
+ comparePlans(optimizedPlanWithRepartitionAndSortBy,
correctPlanWithRepartitionAndSortBy)
+
+ // can remove sortBy before repartition with orderBy
+ val planWithRepartitionAndOrderBy = planWithRepartition.orderBy('a.asc)
+ val optimizedPlanWithRepartitionAndOrderBy =
optimize(planWithRepartitionAndOrderBy)
+ val correctPlanWithRepartitionAndOrderBy =
analyze(repartition(optimizedPlan).orderBy('a.asc))
+ comparePlans(optimizedPlanWithRepartitionAndOrderBy,
correctPlanWithRepartitionAndOrderBy)
+ }
+
+ private def analyze(plan: LogicalPlan): LogicalPlan = {
+ analyzer.execute(plan)
+ }
+
+ private def optimize(plan: LogicalPlan): LogicalPlan = {
+ Optimize.execute(analyzer.execute(plan))
+ }
+}
+
+class EliminateSortsBeforeRepartitionByExprsSuite extends
EliminateSortsBeforeRepartitionSuite {
+ override def repartition(plan: LogicalPlan): LogicalPlan =
plan.distribute('a)(10)
+
+ test("sortBy before repartition with non-deterministic expressions") {
+ val plan = testRelation.sortBy('a.asc, 'b.asc).limit(10)
+ val planWithRepartition = plan.distribute(rand(1).asc, 'a.asc)(20)
+ checkRepartitionCases(plan = planWithRepartition, optimizedPlan =
planWithRepartition)
+ }
+
+ test("orderBy before repartition with non-deterministic expressions") {
+ val plan = testRelation.orderBy('a.asc, 'b.asc).limit(10)
+ val planWithRepartition = plan.distribute(rand(1).asc, 'a.asc)(20)
+ checkRepartitionCases(plan = planWithRepartition, optimizedPlan =
planWithRepartition)
+ }
+}
+
+class EliminateSortsBeforeCoalesceSuite extends
EliminateSortsBeforeRepartitionSuite {
+ override def repartition(plan: LogicalPlan): LogicalPlan = plan.coalesce(1)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]