This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 78700d939c4 [SPARK-38797][SQL] Runtime Filter supports pruning side
has window
78700d939c4 is described below
commit 78700d939c42404ce6bd420094e13a258875949b
Author: Yuming Wang <[email protected]>
AuthorDate: Thu Apr 14 08:39:15 2022 +0800
[SPARK-38797][SQL] Runtime Filter supports pruning side has window
### What changes were proposed in this pull request?
1. Makes row-level runtime filtering support pruning side has window. For
example:
```sql
SELECT *
FROM (SELECT *,
Row_number() OVER ( partition BY c1 ORDER BY f1) rn
FROM bf1) bf1
JOIN bf2
ON bf1.c1 = bf2.c2
WHERE bf2.a2 = 62
```
After this PR:
```
== Optimized Logical Plan ==
Join Inner, (c1#45922 = c2#45928), Statistics(sizeInBytes=12.3 MiB)
:- Window [row_number() windowspecdefinition(c1#45922, f1#45925 ASC
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS rn#45976], [c1#45922], [f1#45925 ASC NULLS FIRST],
Statistics(sizeInBytes=3.7 KiB)
: +- Filter (isnotnull(c1#45922) AND
might_contain(scalar-subquery#45993 [], xxhash64(c1#45922, 42))),
Statistics(sizeInBytes=3.3 KiB)
: : +- Aggregate [bloom_filter_agg(xxhash64(c2#45928, 42),
1000000, 8388608, 0, 0) AS bloomFilter#45992], Statistics(sizeInBytes=108.0 B,
rowCount=1)
: : +- Project [c2#45928], Statistics(sizeInBytes=1278.0 B)
: : +- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND
isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB)
: : +- Relation
default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet,
Statistics(sizeInBytes=3.3 KiB)
: +- Relation
default.bf1[a1#45920,b1#45921,c1#45922,d1#45923,e1#45924,f1#45925] parquet,
Statistics(sizeInBytes=3.3 KiB)
+- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND
isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB)
+- Relation
default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet,
Statistics(sizeInBytes=3.3 KiB)
```
2. Make sure injected filters could push through Shuffle if current join is
a broadcast join.
### Why are the changes needed?
Improve query performance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #36080 from wangyum/SPARK-38797.
Lead-authored-by: Yuming Wang <[email protected]>
Co-authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
---
.../catalyst/optimizer/InjectRuntimeFilter.scala | 5 +++--
.../spark/sql/InjectRuntimeFilterSuite.scala | 26 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 134292ae30d..01c1786e05a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -141,6 +141,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
plan.exists {
case Join(left, right, _, _, hint) => isProbablyShuffleJoin(left, right,
hint)
case _: Aggregate => true
+ case _: Window => true
case _ => false
}
}
@@ -172,8 +173,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
/**
* Check that:
- * - The filterApplicationSideJoinExp can be pushed down through joins and
aggregates (ie the
- * expression references originate from a single leaf node)
+ * - The filterApplicationSideJoinExp can be pushed down through joins,
aggregates and windows
+ * (ie the expression references originate from a single leaf node)
* - The filter creation side has a selective predicate
* - The current join is a shuffle join or a broadcast join that has a
shuffle below it
* - The max filterApplicationSide scan size is greater than a configurable
threshold
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index 726fa341b5c..6065f232109 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -539,4 +539,30 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
""".stripMargin)
}
}
+
+ test("Runtime Filter supports pruning side has Aggregate") {
+
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000") {
+ assertRewroteWithBloomFilter(
+ """
+ |SELECT *
+ |FROM (SELECT c1 AS aliased_c1, d1 FROM bf1 GROUP BY c1, d1) bf1
+ | JOIN bf2 ON bf1.aliased_c1 = bf2.c2
+ |WHERE bf2.a2 = 62
+ """.stripMargin)
+ }
+ }
+
+ test("Runtime Filter supports pruning side has Window") {
+
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000") {
+ assertRewroteWithBloomFilter(
+ """
+ |SELECT *
+ |FROM (SELECT *,
+ | Row_number() OVER (PARTITION BY c1 ORDER BY f1) rn
+ | FROM bf1) bf1
+ | JOIN bf2 ON bf1.c1 = bf2.c2
+ |WHERE bf2.a2 = 62
+ """.stripMargin)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]