This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new e54dc43e750 [SPARK-38565][SQL] Support Left Semi join in row level
runtime filters
e54dc43e750 is described below
commit e54dc43e750be23062422ca096d1e8439178a1d1
Author: Yuming Wang <[email protected]>
AuthorDate: Mon Apr 11 15:43:41 2022 +0800
[SPARK-38565][SQL] Support Left Semi join in row level runtime filters
### What changes were proposed in this pull request?
1. Support Left Semi join in row level runtime filters.
2. Rename
`spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizethreshold` to
`spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold`.
### Why are the changes needed?
Improve query performance and make the code easier to maintain.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
Closes #36131 from wangyum/SPARK-38565.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit 073fd2ad5c16d193725954e76ce357e4a9d97449)
Signed-off-by: Yuming Wang <[email protected]>
---
.../catalyst/optimizer/InjectRuntimeFilter.scala | 29 +++++++---------------
.../spark/sql/catalyst/optimizer/joins.scala | 10 ++++++++
.../org/apache/spark/sql/internal/SQLConf.scala | 2 +-
.../dynamicpruning/PartitionPruning.scala | 14 ++---------
.../spark/sql/InjectRuntimeFilterSuite.scala | 13 ++++++++++
5 files changed, 35 insertions(+), 33 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index a69cda25ef4..134292ae30d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
BloomFilterAggregate, Complete}
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys,
PhysicalOperation}
-import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{INVOKE,
JSON_TO_STRUCT, LIKE_FAMLIY, PYTHON_UDF, REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE,
SCALA_UDF}
@@ -132,16 +131,6 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
}
- private def canFilterLeft(joinType: JoinType): Boolean = joinType match {
- case Inner | RightOuter => true
- case _ => false
- }
-
- private def canFilterRight(joinType: JoinType): Boolean = joinType match {
- case Inner | LeftOuter => true
- case _ => false
- }
-
private def isProbablyShuffleJoin(left: LogicalPlan,
right: LogicalPlan, hint: JoinHint): Boolean = {
!hintToBroadcastLeft(hint) && !hintToBroadcastRight(hint) &&
@@ -149,11 +138,11 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
}
private def probablyHasShuffle(plan: LogicalPlan): Boolean = {
- plan.collectFirst {
- case j@Join(left, right, _, _, hint)
- if isProbablyShuffleJoin(left, right, hint) => j
- case a: Aggregate => a
- }.nonEmpty
+ plan.exists {
+ case Join(left, right, _, _, hint) => isProbablyShuffleJoin(left, right,
hint)
+ case _: Aggregate => true
+ case _ => false
+ }
}
// Returns the max scan byte size in the subtree rooted at
`filterApplicationSide`.
@@ -235,7 +224,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
}
private def findBloomFilterWithExp(plan: LogicalPlan, key: Expression):
Boolean = {
- plan.find {
+ plan.exists {
case Filter(condition, _) =>
splitConjunctivePredicates(condition).exists {
case BloomFilterMightContain(_, XxHash64(Seq(valueExpression), _))
@@ -243,7 +232,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
case _ => false
}
case _ => false
- }.isDefined
+ }
}
def hasInSubquery(left: LogicalPlan, right: LogicalPlan, leftKey: Expression,
@@ -277,11 +266,11 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
isSimpleExpression(l) && isSimpleExpression(r)) {
val oldLeft = newLeft
val oldRight = newRight
- if (canFilterLeft(joinType) && filteringHasBenefit(left, right, l,
hint)) {
+ if (canPruneLeft(joinType) && filteringHasBenefit(left, right, l,
hint)) {
newLeft = injectFilter(l, newLeft, r, right)
}
// Did we actually inject on the left? If not, try on the right
- if (newLeft.fastEquals(oldLeft) && canFilterRight(joinType) &&
+ if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
filteringHasBenefit(right, left, r, hint)) {
newRight = injectFilter(r, newRight, l, left)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 6d683a7a113..45d8c54ea19 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -347,6 +347,16 @@ trait JoinSelectionHelper {
join.hint, hintOnly = false, conf).isDefined
}
+ def canPruneLeft(joinType: JoinType): Boolean = joinType match {
+ case Inner | LeftSemi | RightOuter => true
+ case _ => false
+ }
+
+ def canPruneRight(joinType: JoinType): Boolean = joinType match {
+ case Inner | LeftSemi | LeftOuter => true
+ case _ => false
+ }
+
def hintToBroadcastLeft(hint: JoinHint): Boolean = {
hint.leftHint.exists(_.strategy.contains(BROADCAST))
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c4ffc844135..365a9a378cb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -375,7 +375,7 @@ object SQLConf {
.createWithDefaultString("10MB")
val RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD =
-
buildConf("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizethreshold")
+
buildConf("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold")
.doc("Byte size threshold of the Bloom filter application side plan's
aggregated scan " +
"size. Aggregated scan byte size of the Bloom filter application side
needs to be over " +
"this value to inject a bloom filter.")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
index 114d58c739e..402c59bc3de 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.dynamicpruning
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.JoinSelectionHelper
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
-import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
@@ -49,7 +49,7 @@ import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
* subquery query twice, we keep the duplicated subquery
* (3) otherwise, we drop the subquery.
*/
-object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
+object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with
JoinSelectionHelper {
/**
* Searches for a table scan that can be filtered for a given column in a
logical plan.
@@ -215,16 +215,6 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper {
!plan.isStreaming && hasSelectivePredicate(plan)
}
- private def canPruneLeft(joinType: JoinType): Boolean = joinType match {
- case Inner | LeftSemi | RightOuter => true
- case _ => false
- }
-
- private def canPruneRight(joinType: JoinType): Boolean = joinType match {
- case Inner | LeftSemi | LeftOuter => true
- case _ => false
- }
-
private def prune(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// skip this rule if there's already a DPP subquery on the LHS of a join
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index 097a18cabd5..726fa341b5c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -526,4 +526,17 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
"bf1.c1 = square(bf2.c2) where bf2.a2 = 62" )
}
}
+
+ test("Support Left Semi join in row level runtime filters") {
+
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "32") {
+ assertRewroteWithBloomFilter(
+ """
+ |SELECT *
+ |FROM bf1 LEFT SEMI
+ |JOIN (SELECT * FROM bf2 WHERE bf2.a2 = 62) tmp
+ |ON bf1.c1 = tmp.c2
+ """.stripMargin)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]