This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c02aef01bbc [SPARK-40501][SQL] Add PushProjectionThroughLimit for
Optimizer
c02aef01bbc is described below
commit c02aef01bbc1530884c23850224293336a5430a2
Author: panbingkun <[email protected]>
AuthorDate: Mon Sep 26 15:50:07 2022 +0800
[SPARK-40501][SQL] Add PushProjectionThroughLimit for Optimizer
### What changes were proposed in this pull request?
The pr aim to add `PushProjectionThroughLimit` for `Optimizer`, improve
query performance.
### Why are the changes needed?
"normalize" the order of project and limit operator **{project(...,
limit(...)) => limit(..., project(...))}**, so that we can have more chances to
merge adjacent projects or limits **{eg: by SpecialLimits}**.
---
When I query a big table(size / per day: 10T, column size: 1219) with limit
1
#### A.Scenario 1(run sql in spark-sql) - The results will be fetched soon
- The optimization of CollectLimitExec has taken effect
1.SQL: select * from xxx where ..._day = '20220919' limit 1
<img width="628" alt="image"
src="https://user-images.githubusercontent.com/15246973/191184857-fa3d7f08-f0ea-4d70-a406-48828a3bb761.png">
2.Spark UI:
<img width="1416" alt="image"
src="https://user-images.githubusercontent.com/15246973/191204519-86de05d3-40d4-4acd-9833-86bf318ecb72.png">
#### B.Scenario 2(run sql in spark-shell) - It took a long time to fetch
out(still running after 20 minutes...)
1.Code: spark.sql("select * from xxx where ..._day = '20220919' limit
1").show()
<img width="557" alt="image"
src="https://user-images.githubusercontent.com/15246973/191211875-c29c3bae-1339-414b-84bc-2195545b8c35.png">
2.Spark UI:
<img width="1419" alt="image"
src="https://user-images.githubusercontent.com/15246973/191212244-22108810-dd66-46bd-bea7-a7dab70a1a06.png">
#### C.Scenario 3(run sql in spark-shell) - The results will be fetched soon
1.Code: spark.sql("select * from xxx where ..._day = '20220919'").show(1)
<img width="544" alt="image"
src="https://user-images.githubusercontent.com/15246973/191215182-bf278f71-c3ee-4028-8372-c9ed69431b6f.png">
2.Spark UI:
<img width="1417" alt="image"
src="https://user-images.githubusercontent.com/15246973/191215437-3a291b34-5257-485a-bc18-6c0e0865d7ce.png">
## The diff between Scenario 2 and Scenario3 is focus on "Optimized Logical
Plan"
<img width="543" alt="image"
src="https://user-images.githubusercontent.com/15246973/191216863-367c764d-2aa0-4c79-b240-0ebb6735937f.png">
<img width="544" alt="image"
src="https://user-images.githubusercontent.com/15246973/191217175-02213b0c-5d09-4154-85f7-18751734afd0.png">
# After pr:
#### Scenario 2(run sql in spark-shell) - The results will be fetched soon
- The optimization of CollectLimitExec has taken effect
1.Code: spark.sql("select * from xxx where ..._day = '20220919' limit
1").show()
<img width="553" alt="image"
src="https://user-images.githubusercontent.com/15246973/191880203-2f951644-b59b-4dc8-9e80-c02c458fc28b.png">
2.Spark UI:
<img width="1417" alt="image"
src="https://user-images.githubusercontent.com/15246973/191219627-82655d01-1d5e-44ee-af49-e8da51c9ca72.png">
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add new UT & Pass GA.
Closes #37941 from panbingkun/improve_shell_limit.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 1 +
.../optimizer/PushProjectionThroughLimit.scala | 39 ++++++++++
.../PushProjectionThroughLimitSuite.scala | 90 ++++++++++++++++++++++
3 files changed, 130 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 3ac75554a2b..2664fd63806 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -81,6 +81,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
Seq(
// Operator push down
PushProjectionThroughUnion,
+ PushProjectionThroughLimit,
ReorderJoin,
EliminateOuterJoin,
PushDownPredicates,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala
new file mode 100644
index 00000000000..6280cc5e42c
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LocalLimit,
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, PROJECT}
+
+/**
+ * Pushes Project operator through Limit operator.
+ */
+object PushProjectionThroughLimit extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+ _.containsAllPatterns(PROJECT, LIMIT)) {
+
+ case p @ Project(projectList, limit @ LocalLimit(_, child))
+ if projectList.forall(_.deterministic) =>
+ limit.copy(child = p.copy(projectList, child))
+
+ case p @ Project(projectList, g @ GlobalLimit(_, limit @ LocalLimit(_,
child)))
+ if projectList.forall(_.deterministic) =>
+ g.copy(child = limit.copy(child = p.copy(projectList, child)))
+ }
+}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala
new file mode 100644
index 00000000000..7e45fc5aeb3
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class PushProjectionThroughLimitSuite extends PlanTest {
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches = Batch("Optimizer Batch",
+ FixedPoint(100),
+ PushProjectionThroughLimit,
+ EliminateLimits) :: Nil
+ }
+
+ test("SPARK-40501: push projection through limit") {
+ val testRelation = LocalRelation.fromExternalRows(
+ Seq("a".attr.int, "b".attr.int, "c".attr.int),
+ 1.to(20).map(_ => Row(1, 2, 3)))
+
+ val query1 = testRelation
+ .limit(10)
+ .select('a, 'b, 'c')
+ .limit(15).analyze
+ val optimized1 = Optimize.execute(query1)
+ val expected1 = testRelation
+ .select('a, 'b, 'c')
+ .limit(10).analyze
+ comparePlans(optimized1, expected1)
+
+ val query2 = testRelation
+ .sortBy($"a".asc)
+ .limit(10)
+ .select('a, 'b, 'c')
+ .limit(15).analyze
+ val optimized2 = Optimize.execute(query2)
+ val expected2 = testRelation
+ .sortBy($"a".asc)
+ .select('a, 'b, 'c')
+ .limit(10).analyze
+ comparePlans(optimized2, expected2)
+
+ val query3 = testRelation
+ .limit(10)
+ .select('a, 'b, 'c')
+ .limit(20)
+ .select('a)
+ .limit(15).analyze
+ val optimized3 = Optimize.execute(query3)
+ val expected3 = testRelation
+ .select('a, 'b, 'c')
+ .select('a)
+ .limit(10).analyze
+ comparePlans(optimized3, expected3)
+
+ val query4 = testRelation
+ .sortBy($"a".asc)
+ .limit(10)
+ .select('a, 'b, 'c')
+ .limit(20)
+ .select('a)
+ .limit(15).analyze
+ val optimized4 = Optimize.execute(query4)
+ val expected4 = testRelation
+ .sortBy($"a".asc)
+ .select('a, 'b, 'c')
+ .select('a)
+ .limit(10).analyze
+ comparePlans(optimized4, expected4)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]