This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1d8df4f6b99b [SPARK-45606][SQL] Release restrictions on multi-layer
runtime filter
1d8df4f6b99b is described below
commit 1d8df4f6b99b836f4267b888e81d67c75b4dfdcd
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Nov 8 19:43:33 2023 +0800
[SPARK-45606][SQL] Release restrictions on multi-layer runtime filter
### What changes were proposed in this pull request?
Before https://github.com/apache/spark/pull/39170, Spark only supports
insert runtime filter for application side of shuffle join on single-layer.
Considered it's not worth to insert more runtime filter if the column already
exists runtime filter, Spark restricts it at
https://github.com/apache/spark/blob/7057952f6bc2c5cf97dd408effd1b18bee1cb8f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala#L346
For example
`select * from bf1 join bf2 on bf1.c1 = bf2.c2 and bf1.c1 = bf2.b2 where
bf2.a2 = 62`
This SQL have two join conditions. There will insert two runtime filter on
`bf1.c1` if haven't the restriction mentioned above.
At that time, it was reasonable.
After https://github.com/apache/spark/pull/39170, Spark supports insert
runtime filter for one side of any shuffle join on multi-layer. But the
restrictions on multi-layer runtime filter mentioned above looks outdated.
For example
`select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1
where bf2.a2 = 5`
Assume bf2 as the build side and insert a runtime filter for bf1. We can't
insert the same runtime filter for bf3 due to there are already a runtime
filter on `bf1.c1`.
The behavior is different from the origin and is unexpected.
The change of the PR doesn't affect the restriction mentioned above.
### Why are the changes needed?
Release restrictions on multi-layer runtime filter.
Expand optimization surface.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
Test cases updated.
Micro benchmark for q9 in TPC-H.
**TPC-H 100**
Query | Master(ms) | PR(ms) | Difference(ms) | Percent
-- | -- | -- | -- | --
q9 | 26491 | 20725 | 5766| 27.82%
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #43449 from beliefer/SPARK-45606.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Jiaan Geng <[email protected]>
---
.../catalyst/optimizer/InjectRuntimeFilter.scala | 33 ++++++++++------------
.../spark/sql/InjectRuntimeFilterSuite.scala | 8 ++----
2 files changed, 18 insertions(+), 23 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 5f5508d6b22c..9c150f1f3308 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -247,15 +247,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
}
}
- private def hasBloomFilter(
- left: LogicalPlan,
- right: LogicalPlan,
- leftKey: Expression,
- rightKey: Expression): Boolean = {
- findBloomFilterWithKey(left, leftKey) || findBloomFilterWithKey(right,
rightKey)
- }
-
- private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression):
Boolean = {
+ private def hasBloomFilter(plan: LogicalPlan, key: Expression): Boolean = {
plan.exists {
case Filter(condition, _) =>
splitConjunctivePredicates(condition).exists {
@@ -277,28 +269,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
leftKeys.lazyZip(rightKeys).foreach((l, r) => {
// Check if:
// 1. There is already a DPP filter on the key
- // 2. There is already a bloom filter on the key
- // 3. The keys are simple cheap expressions
+ // 2. The keys are simple cheap expressions
if (filterCounter < numFilterThreshold &&
!hasDynamicPruningSubquery(left, right, l, r) &&
- !hasBloomFilter(newLeft, newRight, l, r) &&
isSimpleExpression(l) && isSimpleExpression(r)) {
val oldLeft = newLeft
val oldRight = newRight
- // Check if the current join is a shuffle join or a broadcast join
that
- // has a shuffle below it
+ // Check if:
+ // 1. The current join type supports prune the left side with
runtime filter
+ // 2. The current join is a shuffle join or a broadcast join that
+ // has a shuffle below it
+ // 3. There is no bloom filter on the left key yet
val hasShuffle = isProbablyShuffleJoin(left, right, hint)
- if (canPruneLeft(joinType) && (hasShuffle ||
probablyHasShuffle(left))) {
+ if (canPruneLeft(joinType) && (hasShuffle ||
probablyHasShuffle(left)) &&
+ !hasBloomFilter(newLeft, l)) {
extractBeneficialFilterCreatePlan(left, right, l, r).foreach {
case (filterCreationSideKey, filterCreationSidePlan) =>
newLeft = injectFilter(l, newLeft, filterCreationSideKey,
filterCreationSidePlan)
}
}
// Did we actually inject on the left? If not, try on the right
- // Check if the current join is a shuffle join or a broadcast join
that
- // has a shuffle below it
+ // Check if:
+ // 1. The current join type supports prune the right side with
runtime filter
+ // 2. The current join is a shuffle join or a broadcast join that
+ // has a shuffle below it
+ // 3. There is no bloom filter on the right key yet
if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
- (hasShuffle || probablyHasShuffle(right))) {
+ (hasShuffle || probablyHasShuffle(right)) &&
!hasBloomFilter(newRight, r)) {
extractBeneficialFilterCreatePlan(right, left, r, l).foreach {
case (filterCreationSideKey, filterCreationSidePlan) =>
newRight = injectFilter(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index 2e57975ee6d1..fc1524be1317 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -335,14 +335,12 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
"bf1.c1 = bf2.c2 where bf2.a2 = 5) as a join bf3 on bf3.c3 = a.c1", 2)
assertRewroteWithBloomFilter("select * from (select * from bf1 right
join bf2 on " +
"bf1.c1 = bf2.c2 where bf2.a2 = 5) as a join bf3 on bf3.c3 = a.c1", 2)
- // Can't leverage the transitivity of join keys due to runtime filters
already exists.
- // bf2 as creation side and inject runtime filter for bf1.
assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on
bf1.c1 = bf2.c2 " +
- "and bf3.c3 = bf1.c1 where bf2.a2 = 5")
+ "and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2)
assertRewroteWithBloomFilter("select * from bf1 left outer join bf2 join
bf3 on " +
- "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5")
+ "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2)
assertRewroteWithBloomFilter("select * from bf1 right outer join bf2
join bf3 on " +
- "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5")
+ "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2)
}
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]