This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5ba708f8fff [SPARK-45725][SQL] Remove the non-default IN subquery runtime filter 5ba708f8fff is described below commit 5ba708f8fffd21b675d819b01e53d11d8166dc9f Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Oct 31 08:49:14 2023 -0700 [SPARK-45725][SQL] Remove the non-default IN subquery runtime filter ### What changes were proposed in this pull request? The IN subquery runtime filter is useless: 1. for small data (the most common case due to the heuristic we use), bloom filter is as selective as the IN subquery, but more performant (hash + mod vs value comparison). 2. for big data, IN subquery will likely OOM and runtime filter is much more efficient. This PR removes the IN subquery runtime filter (the default is bloom filter) to simplify code. ### Why are the changes needed? simplify code and tests, and makes Spark simple by removing one knob. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43585 from cloud-fan/filter. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../catalyst/optimizer/InjectRuntimeFilter.scala | 97 +++------------ .../org/apache/spark/sql/internal/SQLConf.scala | 15 +-- .../spark/sql/InjectRuntimeFilterSuite.scala | 132 ++++----------------- 3 files changed, 39 insertions(+), 205 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 30526bd8106..5f5508d6b22 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -26,47 +26,27 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{INVOKE, JSON_TO_STRUCT, LIKE_FAMLIY, PYTHON_UDF, REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE, SCALA_UDF} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types._ /** * Insert a runtime filter on one side of the join (we call this side the application side) if * we can extract a runtime filter from the other side (creation side). A simple case is that * the creation side is a table scan with a selective filter. - * The runtime filter is logically an IN subquery with the join keys (converted to a semi join), - * but can be something different physically, such as a bloom filter. + * The runtime filter is logically an IN subquery with the join keys. Currently it's always + * bloom filter but we may add other physical implementations in the future. */ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper { - // Wraps `joinKey` with a hash function if its byte size is larger than an integer. - private def mayWrapWithHash(joinKey: Expression): Expression = { - if (joinKey.dataType.defaultSize > IntegerType.defaultSize) { - new Murmur3Hash(Seq(joinKey)) - } else { - joinKey - } - } - private def injectFilter( filterApplicationSideKey: Expression, filterApplicationSidePlan: LogicalPlan, filterCreationSideKey: Expression, filterCreationSidePlan: LogicalPlan): LogicalPlan = { - require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled) - if (conf.runtimeFilterBloomFilterEnabled) { - injectBloomFilter( - filterApplicationSideKey, - filterApplicationSidePlan, - filterCreationSideKey, - filterCreationSidePlan - ) - } else { - injectInSubqueryFilter( - filterApplicationSideKey, - filterApplicationSidePlan, - filterCreationSideKey, - filterCreationSidePlan - ) - } + injectBloomFilter( + filterApplicationSideKey, + filterApplicationSidePlan, + filterCreationSideKey, + filterCreationSidePlan + ) } private def injectBloomFilter( @@ -95,26 +75,6 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J Filter(filter, filterApplicationSidePlan) } - private def injectInSubqueryFilter( - filterApplicationSideKey: Expression, - filterApplicationSidePlan: LogicalPlan, - filterCreationSideKey: Expression, - filterCreationSidePlan: LogicalPlan): LogicalPlan = { - require(filterApplicationSideKey.dataType == filterCreationSideKey.dataType) - val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideKey) - val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)() - val aggregate = - ColumnPruning(Aggregate(Seq(filterCreationSideKey), Seq(alias), filterCreationSidePlan)) - if (!canBroadcastBySize(aggregate, conf)) { - // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold, - // i.e., the semi-join will be a shuffled join, which is not worthwhile. - return filterApplicationSidePlan - } - val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideKey)), - ListQuery(aggregate, numCols = aggregate.output.length)) - Filter(filter, filterApplicationSidePlan) - } - /** * Extracts a sub-plan which is a simple filter over scan from the input plan. The simple * filter should be selective and the filter condition (including expressions in the child @@ -270,18 +230,9 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J } } - def hasRuntimeFilter(left: LogicalPlan, right: LogicalPlan, leftKey: Expression, - rightKey: Expression): Boolean = { - if (conf.runtimeFilterBloomFilterEnabled) { - hasBloomFilter(left, right, leftKey, rightKey) - } else { - hasInSubquery(left, right, leftKey, rightKey) - } - } - // This checks if there is already a DPP filter, as this rule is called just after DPP. @tailrec - def hasDynamicPruningSubquery( + private def hasDynamicPruningSubquery( left: LogicalPlan, right: LogicalPlan, leftKey: Expression, @@ -296,7 +247,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J } } - def hasBloomFilter( + private def hasBloomFilter( left: LogicalPlan, right: LogicalPlan, leftKey: Expression, @@ -316,19 +267,6 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J } } - def hasInSubquery(left: LogicalPlan, right: LogicalPlan, leftKey: Expression, - rightKey: Expression): Boolean = { - (left, right) match { - case (Filter(InSubquery(Seq(key), - ListQuery(Aggregate(Seq(Alias(_, _)), Seq(Alias(_, _)), _), _, _, _, _, _)), _), _) => - key.fastEquals(leftKey) || key.fastEquals(new Murmur3Hash(Seq(leftKey))) - case (_, Filter(InSubquery(Seq(key), - ListQuery(Aggregate(Seq(Alias(_, _)), Seq(Alias(_, _)), _), _, _, _, _, _)), _)) => - key.fastEquals(rightKey) || key.fastEquals(new Murmur3Hash(Seq(rightKey))) - case _ => false - } - } - private def tryInjectRuntimeFilter(plan: LogicalPlan): LogicalPlan = { var filterCounter = 0 val numFilterThreshold = conf.getConf(SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD) @@ -339,11 +277,11 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J leftKeys.lazyZip(rightKeys).foreach((l, r) => { // Check if: // 1. There is already a DPP filter on the key - // 2. There is already a runtime filter (Bloom filter or IN subquery) on the key + // 2. There is already a bloom filter on the key // 3. The keys are simple cheap expressions if (filterCounter < numFilterThreshold && !hasDynamicPruningSubquery(left, right, l, r) && - !hasRuntimeFilter(newLeft, newRight, l, r) && + !hasBloomFilter(newLeft, newRight, l, r) && isSimpleExpression(l) && isSimpleExpression(r)) { val oldLeft = newLeft val oldRight = newRight @@ -378,15 +316,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J override def apply(plan: LogicalPlan): LogicalPlan = plan match { case s: Subquery if s.correlated => plan - case _ if !conf.runtimeFilterSemiJoinReductionEnabled && - !conf.runtimeFilterBloomFilterEnabled => plan - case _ => - val newPlan = tryInjectRuntimeFilter(plan) - if (conf.runtimeFilterSemiJoinReductionEnabled && !plan.fastEquals(newPlan)) { - RewritePredicateSubquery(newPlan) - } else { - newPlan - } + case _ if !conf.runtimeFilterBloomFilterEnabled => plan + case _ => tryInjectRuntimeFilter(plan) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 43dc541fbb9..1f37363648a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -369,14 +369,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED = - buildConf("spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled") - .doc("When true and if one side of a shuffle join has a selective predicate, we attempt " + - "to insert a semi join in the other side to reduce the amount of shuffle data.") - .version("3.3.0") - .booleanConf - .createWithDefault(false) - val RUNTIME_FILTER_NUMBER_THRESHOLD = buildConf("spark.sql.optimizer.runtimeFilter.number.threshold") .doc("The total number of injected runtime filters (non-DPP) for a single " + @@ -4639,7 +4631,9 @@ object SQLConf { "returns null when getting a map value with a non-existing key. See SPARK-40066 " + "for more details."), RemovedConfig("spark.sql.hive.verifyPartitionPath", "4.0.0", "false", - s"This config was replaced by '${IGNORE_MISSING_FILES.key}'.") + s"This config was replaced by '${IGNORE_MISSING_FILES.key}'."), + RemovedConfig("spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled", "false", "4.0", + "This optimizer config is useless as runtime filter cannot be an IN subquery now.") ) Map(configs.map { cfg => cfg.key -> cfg } : _*) @@ -4692,9 +4686,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def dynamicPartitionPruningReuseBroadcastOnly: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY) - def runtimeFilterSemiJoinReductionEnabled: Boolean = - getConf(RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED) - def runtimeFilterBloomFilterEnabled: Boolean = getConf(RUNTIME_BLOOM_FILTER_ENABLED) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index c46e0bfcecb..2e57975ee6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.{Alias, BloomFilterMightContain, Literal} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate} -import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, MergeScalarSubqueries} -import org.apache.spark.sql.catalyst.plans.LeftSemi -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.optimizer.MergeScalarSubqueries +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan} import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEPropagateEmptyRelation} import org.apache.spark.sql.internal.SQLConf @@ -225,58 +224,28 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp super.afterAll() } - private def ensureLeftSemiJoinExists(plan: LogicalPlan): Unit = { - assert( - plan.find { - case j: Join if j.joinType == LeftSemi => true - case _ => false - }.isDefined - ) - } - - def checkWithAndWithoutFeatureEnabled(query: String, testSemiJoin: Boolean, - shouldReplace: Boolean, runtimeFilterNum: Int = 1): Unit = { + def checkWithAndWithoutFeatureEnabled( + query: String, + shouldReplace: Boolean, + runtimeFilterNum: Int = 1): Unit = { var planDisabled: LogicalPlan = null var planEnabled: LogicalPlan = null var expectedAnswer: Array[Row] = null - withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", - SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { + withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { planDisabled = sql(query).queryExecution.optimizedPlan expectedAnswer = sql(query).collect() } - if (testSemiJoin) { - withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "true", - SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { - planEnabled = sql(query).queryExecution.optimizedPlan - checkAnswer(sql(query), expectedAnswer) - } + withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") { + planEnabled = sql(query).queryExecution.optimizedPlan + checkAnswer(sql(query), expectedAnswer) + assert(getNumBloomFilters(planDisabled) == 0) if (shouldReplace) { - val normalizedEnabled = normalizePlan(normalizeExprIds(planEnabled)) - val normalizedDisabled = normalizePlan(normalizeExprIds(planDisabled)) - ensureLeftSemiJoinExists(planEnabled) - assert(normalizedEnabled != normalizedDisabled) - val agg = planEnabled.collect { - case Join(_, agg: Aggregate, LeftSemi, _, _) => agg - } - assert(agg.size == runtimeFilterNum) - assert(agg.head.fastEquals(ColumnPruning(agg.head))) + assert(!columnPruningTakesEffect(planEnabled)) + assert(getNumBloomFilters(planEnabled) == runtimeFilterNum) } else { - comparePlans(planDisabled, planEnabled) - } - } else { - withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", - SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") { - planEnabled = sql(query).queryExecution.optimizedPlan - checkAnswer(sql(query), expectedAnswer) - assert(getNumBloomFilters(planDisabled) == 0) - if (shouldReplace) { - assert(!columnPruningTakesEffect(planEnabled)) - assert(getNumBloomFilters(planEnabled) == runtimeFilterNum) - } else { - assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled)) - } + assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled)) } } } @@ -320,62 +289,12 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp }.nonEmpty } - def assertRewroteSemiJoin(query: String, runtimeFilterNum: Int = 1): Unit = { - checkWithAndWithoutFeatureEnabled( - query, testSemiJoin = true, shouldReplace = true, runtimeFilterNum) - } - - def assertDidNotRewriteSemiJoin(query: String): Unit = { - checkWithAndWithoutFeatureEnabled(query, testSemiJoin = true, shouldReplace = false) - } - def assertRewroteWithBloomFilter(query: String, runtimeFilterNum: Int = 1): Unit = { - checkWithAndWithoutFeatureEnabled( - query, testSemiJoin = false, shouldReplace = true, runtimeFilterNum) + checkWithAndWithoutFeatureEnabled(query, shouldReplace = true, runtimeFilterNum) } def assertDidNotRewriteWithBloomFilter(query: String): Unit = { - checkWithAndWithoutFeatureEnabled(query, testSemiJoin = false, shouldReplace = false) - } - - test("Runtime semi join reduction: simple") { - // Filter creation side is 3409 bytes - // Filter application side scan is 3362 bytes - withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - assertRewroteSemiJoin("select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62") - assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on bf1.c1 = bf2.c2") - } - } - - test("Runtime semi join reduction: two joins") { - withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " + - "and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2) - } - } - - test("Runtime semi join reduction: three joins") { - withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 join bf4 on " + - "bf1.c1 = bf2.c2 and bf2.c2 = bf3.c3 and bf3.c3 = bf4.c4 where bf1.a1 = 5", 3) - } - } - - test("Runtime semi join reduction: simple expressions only") { - withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { - val squared = (s: Long) => { - s * s - } - spark.udf.register("square", squared) - assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " + - "bf1.c1 = bf2.c2 where square(bf2.a2) = 62") - assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " + - "bf1.c1 = square(bf2.c2) where bf2.a2= 62") - } + checkWithAndWithoutFeatureEnabled(query, shouldReplace = false) } test("Runtime bloom filter join: simple") { @@ -457,14 +376,12 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " + "bf1.b1 = bf2.b2 where bf2.a2 = 62" - withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", - SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { + withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { planDisabled = sql(query).queryExecution.optimizedPlan expectedAnswer = sql(query).collect() } - withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", - SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") { + withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") { planEnabled = sql(query).queryExecution.optimizedPlan checkAnswer(sql(query), expectedAnswer) } @@ -482,15 +399,13 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " + "bf1.b1 = bf2.b2 where bf2.a2 = 62" - withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", - SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { + withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { planDisabled = sql(query).queryExecution.optimizedPlan expectedAnswer = sql(query).collect() } for (numFilterThreshold <- 0 to 3) { - withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", - SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", + withSQLConf( SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD.key -> numFilterThreshold.toString) { planEnabled = sql(query).queryExecution.optimizedPlan checkAnswer(sql(query), expectedAnswer) @@ -515,14 +430,12 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " + "bf1.c1 = bf2.b2 where bf2.a2 = 62" - withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", - SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { + withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") { planDisabled = sql(query).queryExecution.optimizedPlan expectedAnswer = sql(query).collect() } - withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", - SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") { + withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") { planEnabled = sql(query).queryExecution.optimizedPlan checkAnswer(sql(query), expectedAnswer) } @@ -648,7 +561,6 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp test("Merge runtime bloom filters") { withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000", - SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false", SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", // Re-enable `MergeScalarSubqueries` SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org