This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5ba708f8fff [SPARK-45725][SQL] Remove the non-default IN subquery
runtime filter
5ba708f8fff is described below
commit 5ba708f8fffd21b675d819b01e53d11d8166dc9f
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Oct 31 08:49:14 2023 -0700
[SPARK-45725][SQL] Remove the non-default IN subquery runtime filter
### What changes were proposed in this pull request?
The IN subquery runtime filter is useless:
1. for small data (the most common case due to the heuristic we use), bloom
filter is as selective as the IN subquery, but more performant (hash + mod vs
value comparison).
2. for big data, IN subquery will likely OOM and runtime filter is much
more efficient.
This PR removes the IN subquery runtime filter (the default is bloom
filter) to simplify code.
### Why are the changes needed?
simplify code and tests, and makes Spark simple by removing one knob.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43585 from cloud-fan/filter.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../catalyst/optimizer/InjectRuntimeFilter.scala | 97 +++------------
.../org/apache/spark/sql/internal/SQLConf.scala | 15 +--
.../spark/sql/InjectRuntimeFilterSuite.scala | 132 ++++-----------------
3 files changed, 39 insertions(+), 205 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 30526bd8106..5f5508d6b22 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -26,47 +26,27 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{INVOKE,
JSON_TO_STRUCT, LIKE_FAMLIY, PYTHON_UDF, REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE,
SCALA_UDF}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types._
/**
* Insert a runtime filter on one side of the join (we call this side the
application side) if
* we can extract a runtime filter from the other side (creation side). A
simple case is that
* the creation side is a table scan with a selective filter.
- * The runtime filter is logically an IN subquery with the join keys
(converted to a semi join),
- * but can be something different physically, such as a bloom filter.
+ * The runtime filter is logically an IN subquery with the join keys.
Currently it's always
+ * bloom filter but we may add other physical implementations in the future.
*/
object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with
JoinSelectionHelper {
- // Wraps `joinKey` with a hash function if its byte size is larger than an
integer.
- private def mayWrapWithHash(joinKey: Expression): Expression = {
- if (joinKey.dataType.defaultSize > IntegerType.defaultSize) {
- new Murmur3Hash(Seq(joinKey))
- } else {
- joinKey
- }
- }
-
private def injectFilter(
filterApplicationSideKey: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideKey: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan = {
- require(conf.runtimeFilterBloomFilterEnabled ||
conf.runtimeFilterSemiJoinReductionEnabled)
- if (conf.runtimeFilterBloomFilterEnabled) {
- injectBloomFilter(
- filterApplicationSideKey,
- filterApplicationSidePlan,
- filterCreationSideKey,
- filterCreationSidePlan
- )
- } else {
- injectInSubqueryFilter(
- filterApplicationSideKey,
- filterApplicationSidePlan,
- filterCreationSideKey,
- filterCreationSidePlan
- )
- }
+ injectBloomFilter(
+ filterApplicationSideKey,
+ filterApplicationSidePlan,
+ filterCreationSideKey,
+ filterCreationSidePlan
+ )
}
private def injectBloomFilter(
@@ -95,26 +75,6 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
Filter(filter, filterApplicationSidePlan)
}
- private def injectInSubqueryFilter(
- filterApplicationSideKey: Expression,
- filterApplicationSidePlan: LogicalPlan,
- filterCreationSideKey: Expression,
- filterCreationSidePlan: LogicalPlan): LogicalPlan = {
- require(filterApplicationSideKey.dataType ==
filterCreationSideKey.dataType)
- val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideKey)
- val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
- val aggregate =
- ColumnPruning(Aggregate(Seq(filterCreationSideKey), Seq(alias),
filterCreationSidePlan))
- if (!canBroadcastBySize(aggregate, conf)) {
- // Skip the InSubquery filter if the size of `aggregate` is beyond
broadcast join threshold,
- // i.e., the semi-join will be a shuffled join, which is not worthwhile.
- return filterApplicationSidePlan
- }
- val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideKey)),
- ListQuery(aggregate, numCols = aggregate.output.length))
- Filter(filter, filterApplicationSidePlan)
- }
-
/**
* Extracts a sub-plan which is a simple filter over scan from the input
plan. The simple
* filter should be selective and the filter condition (including
expressions in the child
@@ -270,18 +230,9 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
}
}
- def hasRuntimeFilter(left: LogicalPlan, right: LogicalPlan, leftKey:
Expression,
- rightKey: Expression): Boolean = {
- if (conf.runtimeFilterBloomFilterEnabled) {
- hasBloomFilter(left, right, leftKey, rightKey)
- } else {
- hasInSubquery(left, right, leftKey, rightKey)
- }
- }
-
// This checks if there is already a DPP filter, as this rule is called just
after DPP.
@tailrec
- def hasDynamicPruningSubquery(
+ private def hasDynamicPruningSubquery(
left: LogicalPlan,
right: LogicalPlan,
leftKey: Expression,
@@ -296,7 +247,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
}
}
- def hasBloomFilter(
+ private def hasBloomFilter(
left: LogicalPlan,
right: LogicalPlan,
leftKey: Expression,
@@ -316,19 +267,6 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
}
}
- def hasInSubquery(left: LogicalPlan, right: LogicalPlan, leftKey: Expression,
- rightKey: Expression): Boolean = {
- (left, right) match {
- case (Filter(InSubquery(Seq(key),
- ListQuery(Aggregate(Seq(Alias(_, _)), Seq(Alias(_, _)), _), _, _, _, _,
_)), _), _) =>
- key.fastEquals(leftKey) || key.fastEquals(new
Murmur3Hash(Seq(leftKey)))
- case (_, Filter(InSubquery(Seq(key),
- ListQuery(Aggregate(Seq(Alias(_, _)), Seq(Alias(_, _)), _), _, _, _, _,
_)), _)) =>
- key.fastEquals(rightKey) || key.fastEquals(new
Murmur3Hash(Seq(rightKey)))
- case _ => false
- }
- }
-
private def tryInjectRuntimeFilter(plan: LogicalPlan): LogicalPlan = {
var filterCounter = 0
val numFilterThreshold =
conf.getConf(SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD)
@@ -339,11 +277,11 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
leftKeys.lazyZip(rightKeys).foreach((l, r) => {
// Check if:
// 1. There is already a DPP filter on the key
- // 2. There is already a runtime filter (Bloom filter or IN
subquery) on the key
+ // 2. There is already a bloom filter on the key
// 3. The keys are simple cheap expressions
if (filterCounter < numFilterThreshold &&
!hasDynamicPruningSubquery(left, right, l, r) &&
- !hasRuntimeFilter(newLeft, newRight, l, r) &&
+ !hasBloomFilter(newLeft, newRight, l, r) &&
isSimpleExpression(l) && isSimpleExpression(r)) {
val oldLeft = newLeft
val oldRight = newRight
@@ -378,15 +316,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with
PredicateHelper with J
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
case s: Subquery if s.correlated => plan
- case _ if !conf.runtimeFilterSemiJoinReductionEnabled &&
- !conf.runtimeFilterBloomFilterEnabled => plan
- case _ =>
- val newPlan = tryInjectRuntimeFilter(plan)
- if (conf.runtimeFilterSemiJoinReductionEnabled &&
!plan.fastEquals(newPlan)) {
- RewritePredicateSubquery(newPlan)
- } else {
- newPlan
- }
+ case _ if !conf.runtimeFilterBloomFilterEnabled => plan
+ case _ => tryInjectRuntimeFilter(plan)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 43dc541fbb9..1f37363648a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -369,14 +369,6 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED =
- buildConf("spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled")
- .doc("When true and if one side of a shuffle join has a selective
predicate, we attempt " +
- "to insert a semi join in the other side to reduce the amount of
shuffle data.")
- .version("3.3.0")
- .booleanConf
- .createWithDefault(false)
-
val RUNTIME_FILTER_NUMBER_THRESHOLD =
buildConf("spark.sql.optimizer.runtimeFilter.number.threshold")
.doc("The total number of injected runtime filters (non-DPP) for a
single " +
@@ -4639,7 +4631,9 @@ object SQLConf {
"returns null when getting a map value with a non-existing key. See
SPARK-40066 " +
"for more details."),
RemovedConfig("spark.sql.hive.verifyPartitionPath", "4.0.0", "false",
- s"This config was replaced by '${IGNORE_MISSING_FILES.key}'.")
+ s"This config was replaced by '${IGNORE_MISSING_FILES.key}'."),
+
RemovedConfig("spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled",
"false", "4.0",
+ "This optimizer config is useless as runtime filter cannot be an IN
subquery now.")
)
Map(configs.map { cfg => cfg.key -> cfg } : _*)
@@ -4692,9 +4686,6 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def dynamicPartitionPruningReuseBroadcastOnly: Boolean =
getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY)
- def runtimeFilterSemiJoinReductionEnabled: Boolean =
- getConf(RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED)
-
def runtimeFilterBloomFilterEnabled: Boolean =
getConf(RUNTIME_BLOOM_FILTER_ENABLED)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index c46e0bfcecb..2e57975ee6d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -19,9 +19,8 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.{Alias,
BloomFilterMightContain, Literal}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
BloomFilterAggregate}
-import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning,
MergeScalarSubqueries}
-import org.apache.spark.sql.catalyst.plans.LeftSemi
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join,
LogicalPlan}
+import org.apache.spark.sql.catalyst.optimizer.MergeScalarSubqueries
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter,
LogicalPlan}
import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
AQEPropagateEmptyRelation}
import org.apache.spark.sql.internal.SQLConf
@@ -225,58 +224,28 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
super.afterAll()
}
- private def ensureLeftSemiJoinExists(plan: LogicalPlan): Unit = {
- assert(
- plan.find {
- case j: Join if j.joinType == LeftSemi => true
- case _ => false
- }.isDefined
- )
- }
-
- def checkWithAndWithoutFeatureEnabled(query: String, testSemiJoin: Boolean,
- shouldReplace: Boolean, runtimeFilterNum: Int = 1): Unit = {
+ def checkWithAndWithoutFeatureEnabled(
+ query: String,
+ shouldReplace: Boolean,
+ runtimeFilterNum: Int = 1): Unit = {
var planDisabled: LogicalPlan = null
var planEnabled: LogicalPlan = null
var expectedAnswer: Array[Row] = null
- withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key ->
"false",
- SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
planDisabled = sql(query).queryExecution.optimizedPlan
expectedAnswer = sql(query).collect()
}
- if (testSemiJoin) {
- withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key ->
"true",
- SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
- planEnabled = sql(query).queryExecution.optimizedPlan
- checkAnswer(sql(query), expectedAnswer)
- }
+ withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
+ planEnabled = sql(query).queryExecution.optimizedPlan
+ checkAnswer(sql(query), expectedAnswer)
+ assert(getNumBloomFilters(planDisabled) == 0)
if (shouldReplace) {
- val normalizedEnabled = normalizePlan(normalizeExprIds(planEnabled))
- val normalizedDisabled = normalizePlan(normalizeExprIds(planDisabled))
- ensureLeftSemiJoinExists(planEnabled)
- assert(normalizedEnabled != normalizedDisabled)
- val agg = planEnabled.collect {
- case Join(_, agg: Aggregate, LeftSemi, _, _) => agg
- }
- assert(agg.size == runtimeFilterNum)
- assert(agg.head.fastEquals(ColumnPruning(agg.head)))
+ assert(!columnPruningTakesEffect(planEnabled))
+ assert(getNumBloomFilters(planEnabled) == runtimeFilterNum)
} else {
- comparePlans(planDisabled, planEnabled)
- }
- } else {
- withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key ->
"false",
- SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
- planEnabled = sql(query).queryExecution.optimizedPlan
- checkAnswer(sql(query), expectedAnswer)
- assert(getNumBloomFilters(planDisabled) == 0)
- if (shouldReplace) {
- assert(!columnPruningTakesEffect(planEnabled))
- assert(getNumBloomFilters(planEnabled) == runtimeFilterNum)
- } else {
- assert(getNumBloomFilters(planEnabled) ==
getNumBloomFilters(planDisabled))
- }
+ assert(getNumBloomFilters(planEnabled) ==
getNumBloomFilters(planDisabled))
}
}
}
@@ -320,62 +289,12 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
}.nonEmpty
}
- def assertRewroteSemiJoin(query: String, runtimeFilterNum: Int = 1): Unit = {
- checkWithAndWithoutFeatureEnabled(
- query, testSemiJoin = true, shouldReplace = true, runtimeFilterNum)
- }
-
- def assertDidNotRewriteSemiJoin(query: String): Unit = {
- checkWithAndWithoutFeatureEnabled(query, testSemiJoin = true,
shouldReplace = false)
- }
-
def assertRewroteWithBloomFilter(query: String, runtimeFilterNum: Int = 1):
Unit = {
- checkWithAndWithoutFeatureEnabled(
- query, testSemiJoin = false, shouldReplace = true, runtimeFilterNum)
+ checkWithAndWithoutFeatureEnabled(query, shouldReplace = true,
runtimeFilterNum)
}
def assertDidNotRewriteWithBloomFilter(query: String): Unit = {
- checkWithAndWithoutFeatureEnabled(query, testSemiJoin = false,
shouldReplace = false)
- }
-
- test("Runtime semi join reduction: simple") {
- // Filter creation side is 3409 bytes
- // Filter application side scan is 3362 bytes
-
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
- assertRewroteSemiJoin("select * from bf1 join bf2 on bf1.c1 = bf2.c2
where bf2.a2 = 62")
- assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on bf1.c1 =
bf2.c2")
- }
- }
-
- test("Runtime semi join reduction: two joins") {
-
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
- assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 on bf1.c1 =
bf2.c2 " +
- "and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
- }
- }
-
- test("Runtime semi join reduction: three joins") {
-
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
- assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 join bf4 on "
+
- "bf1.c1 = bf2.c2 and bf2.c2 = bf3.c3 and bf3.c3 = bf4.c4 where bf1.a1
= 5", 3)
- }
- }
-
- test("Runtime semi join reduction: simple expressions only") {
-
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
- val squared = (s: Long) => {
- s * s
- }
- spark.udf.register("square", squared)
- assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " +
- "bf1.c1 = bf2.c2 where square(bf2.a2) = 62")
- assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " +
- "bf1.c1 = square(bf2.c2) where bf2.a2= 62")
- }
+ checkWithAndWithoutFeatureEnabled(query, shouldReplace = false)
}
test("Runtime bloom filter join: simple") {
@@ -457,14 +376,12 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
"bf1.b1 = bf2.b2 where bf2.a2 = 62"
- withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key ->
"false",
- SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
planDisabled = sql(query).queryExecution.optimizedPlan
expectedAnswer = sql(query).collect()
}
- withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key ->
"false",
- SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
+ withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
planEnabled = sql(query).queryExecution.optimizedPlan
checkAnswer(sql(query), expectedAnswer)
}
@@ -482,15 +399,13 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
"bf1.b1 = bf2.b2 where bf2.a2 = 62"
- withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key ->
"false",
- SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
planDisabled = sql(query).queryExecution.optimizedPlan
expectedAnswer = sql(query).collect()
}
for (numFilterThreshold <- 0 to 3) {
- withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key ->
"false",
- SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
+ withSQLConf( SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD.key ->
numFilterThreshold.toString) {
planEnabled = sql(query).queryExecution.optimizedPlan
checkAnswer(sql(query), expectedAnswer)
@@ -515,14 +430,12 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
"bf1.c1 = bf2.b2 where bf2.a2 = 62"
- withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key ->
"false",
- SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
planDisabled = sql(query).queryExecution.optimizedPlan
expectedAnswer = sql(query).collect()
}
- withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key ->
"false",
- SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
+ withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
planEnabled = sql(query).queryExecution.optimizedPlan
checkAnswer(sql(query), expectedAnswer)
}
@@ -648,7 +561,6 @@ class InjectRuntimeFilterSuite extends QueryTest with
SQLTestUtils with SharedSp
test("Merge runtime bloom filters") {
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
-> "3000",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000",
- SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false",
SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
// Re-enable `MergeScalarSubqueries`
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]