This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d9d4b593533 [SPARK-42783][SQL] Infer window group limit should run as
late as possible
d9d4b593533 is described below
commit d9d4b59353349e30135cdee718e18a129547ff41
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Mar 15 20:01:21 2023 +0800
[SPARK-42783][SQL] Infer window group limit should run as late as possible
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/40142 have an unrelated change and is
actually a regression. The change let infer window group limit runs early.
Infer window group limit should run as late as possible, it is more safe.
### Why are the changes needed?
Infer window group limit should run as late as possible.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
Exists test cases.
Manually generate the micro benchmark.
```
Benchmark Top-K: Best
Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------
ROW_NUMBER (PARTITION: , WindowGroupLimit: false)
10972 11739 765 1.9 523.2 1.0X
ROW_NUMBER (PARTITION: , WindowGroupLimit: true)
1700 1738 29 12.3 81.0 6.5X
ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: false)
24317 24452 113 0.9 1159.5 0.5X
ROW_NUMBER (PARTITION: PARTITION BY b, WindowGroupLimit: true)
6608 6965 348 3.2 315.1 1.7X
RANK (PARTITION: , WindowGroupLimit: false)
11549 11850 160 1.8 550.7 1.0X
RANK (PARTITION: , WindowGroupLimit: true)
2916 3211 267 7.2 139.1 3.8X
RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)
24736 25951 565 0.8 1179.5 0.4X
RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)
6825 7256 497 3.1 325.5 1.6X
DENSE_RANK (PARTITION: , WindowGroupLimit: false)
11857 12513 652 1.8 565.4 0.9X
DENSE_RANK (PARTITION: , WindowGroupLimit: true)
2721 2937 113 7.7 129.8 4.0X
DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: false)
24976 25686 760 0.8 1191.0 0.4X
DENSE_RANK (PARTITION: PARTITION BY b, WindowGroupLimit: true)
6568 6884 364 3.2 313.2 1.7X
```
Closes #40410 from beliefer/SPARK-42783.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala | 6 +++---
.../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 1 -
.../main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala | 5 +++++
3 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
index 8e3dc662205..261be291463 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
@@ -84,11 +84,11 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with
PredicateHelper {
}
// Pick a rank-like function with the smallest limit
selectedLimits.minBy(_._1) match {
- case (limit, rankLikeFunction) if limit <=
conf.windowGroupLimitThreshold =>
+ case (limit, rankLikeFunction) if limit <=
conf.windowGroupLimitThreshold &&
+ child.maxRows.forall(_ > limit) =>
if (limit > 0) {
val newFilterChild = if
(rankLikeFunction.isInstanceOf[RowNumber] &&
- partitionSpec.isEmpty && child.maxRows.forall(_ > limit) &&
- limit < conf.topKSortFallbackThreshold) {
+ partitionSpec.isEmpty && limit <
conf.topKSortFallbackThreshold) {
// Top n (Limit + Sort) have better performance than
WindowGroupLimit if the
// window function is RowNumber and Window partitionSpec is
empty.
Limit(Literal(limit), window)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ca55a281605..a0d49c29470 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -130,7 +130,6 @@ abstract class Optimizer(catalogManager: CatalogManager)
val operatorOptimizationBatch: Seq[Batch] = {
Batch("Operator Optimization before Inferring Filters", fixedPoint,
operatorOptimizationRuleSet: _*) ::
- Batch("Infer window group limit", Once, InferWindowGroupLimit) ::
Batch("Infer Filters", Once,
InferFiltersFromGenerate,
InferFiltersFromConstraints) ::
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 8c420838ca2..f05fe9d60fb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -84,6 +84,11 @@ class SparkOptimizer(
PushPredicateThroughNonJoin,
PushProjectionThroughLimit,
RemoveNoopOperators) :+
+ Batch("Infer window group limit", Once,
+ InferWindowGroupLimit,
+ LimitPushDown,
+ LimitPushDownThroughWindow,
+ EliminateLimits) :+
Batch("User Provided Optimizers", fixedPoint,
experimentalMethods.extraOptimizations: _*) :+
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]