This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ba708f8fff [SPARK-45725][SQL] Remove the non-default IN subquery 
runtime filter
5ba708f8fff is described below

commit 5ba708f8fffd21b675d819b01e53d11d8166dc9f
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Oct 31 08:49:14 2023 -0700

    [SPARK-45725][SQL] Remove the non-default IN subquery runtime filter
    
    ### What changes were proposed in this pull request?
    
    The IN subquery runtime filter is useless:
    1. for small data (the most common case due to the heuristic we use), bloom 
filter is as selective as the IN subquery, but more performant (hash + mod vs 
value comparison).
    2. for big data, IN subquery will likely OOM and runtime filter is much 
more efficient.
    
    This PR removes the IN subquery runtime filter (the default is bloom 
filter) to simplify code.
    
    ### Why are the changes needed?
    
    simplify code and tests, and makes Spark simple by removing one knob.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43585 from cloud-fan/filter.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../catalyst/optimizer/InjectRuntimeFilter.scala   |  97 +++------------
 .../org/apache/spark/sql/internal/SQLConf.scala    |  15 +--
 .../spark/sql/InjectRuntimeFilterSuite.scala       | 132 ++++-----------------
 3 files changed, 39 insertions(+), 205 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 30526bd8106..5f5508d6b22 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -26,47 +26,27 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{INVOKE, 
JSON_TO_STRUCT, LIKE_FAMLIY, PYTHON_UDF, REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE, 
SCALA_UDF}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types._
 
 /**
  * Insert a runtime filter on one side of the join (we call this side the 
application side) if
  * we can extract a runtime filter from the other side (creation side). A 
simple case is that
  * the creation side is a table scan with a selective filter.
- * The runtime filter is logically an IN subquery with the join keys 
(converted to a semi join),
- * but can be something different physically, such as a bloom filter.
+ * The runtime filter is logically an IN subquery with the join keys. 
Currently it's always
+ * bloom filter but we may add other physical implementations in the future.
  */
 object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with 
JoinSelectionHelper {
 
-  // Wraps `joinKey` with a hash function if its byte size is larger than an 
integer.
-  private def mayWrapWithHash(joinKey: Expression): Expression = {
-    if (joinKey.dataType.defaultSize > IntegerType.defaultSize) {
-      new Murmur3Hash(Seq(joinKey))
-    } else {
-      joinKey
-    }
-  }
-
   private def injectFilter(
       filterApplicationSideKey: Expression,
       filterApplicationSidePlan: LogicalPlan,
       filterCreationSideKey: Expression,
       filterCreationSidePlan: LogicalPlan): LogicalPlan = {
-    require(conf.runtimeFilterBloomFilterEnabled || 
conf.runtimeFilterSemiJoinReductionEnabled)
-    if (conf.runtimeFilterBloomFilterEnabled) {
-      injectBloomFilter(
-        filterApplicationSideKey,
-        filterApplicationSidePlan,
-        filterCreationSideKey,
-        filterCreationSidePlan
-      )
-    } else {
-      injectInSubqueryFilter(
-        filterApplicationSideKey,
-        filterApplicationSidePlan,
-        filterCreationSideKey,
-        filterCreationSidePlan
-      )
-    }
+    injectBloomFilter(
+      filterApplicationSideKey,
+      filterApplicationSidePlan,
+      filterCreationSideKey,
+      filterCreationSidePlan
+    )
   }
 
   private def injectBloomFilter(
@@ -95,26 +75,6 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
     Filter(filter, filterApplicationSidePlan)
   }
 
-  private def injectInSubqueryFilter(
-      filterApplicationSideKey: Expression,
-      filterApplicationSidePlan: LogicalPlan,
-      filterCreationSideKey: Expression,
-      filterCreationSidePlan: LogicalPlan): LogicalPlan = {
-    require(filterApplicationSideKey.dataType == 
filterCreationSideKey.dataType)
-    val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideKey)
-    val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
-    val aggregate =
-      ColumnPruning(Aggregate(Seq(filterCreationSideKey), Seq(alias), 
filterCreationSidePlan))
-    if (!canBroadcastBySize(aggregate, conf)) {
-      // Skip the InSubquery filter if the size of `aggregate` is beyond 
broadcast join threshold,
-      // i.e., the semi-join will be a shuffled join, which is not worthwhile.
-      return filterApplicationSidePlan
-    }
-    val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideKey)),
-      ListQuery(aggregate, numCols = aggregate.output.length))
-    Filter(filter, filterApplicationSidePlan)
-  }
-
   /**
    * Extracts a sub-plan which is a simple filter over scan from the input 
plan. The simple
    * filter should be selective and the filter condition (including 
expressions in the child
@@ -270,18 +230,9 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
     }
   }
 
-  def hasRuntimeFilter(left: LogicalPlan, right: LogicalPlan, leftKey: 
Expression,
-      rightKey: Expression): Boolean = {
-    if (conf.runtimeFilterBloomFilterEnabled) {
-      hasBloomFilter(left, right, leftKey, rightKey)
-    } else {
-      hasInSubquery(left, right, leftKey, rightKey)
-    }
-  }
-
   // This checks if there is already a DPP filter, as this rule is called just 
after DPP.
   @tailrec
-  def hasDynamicPruningSubquery(
+  private def hasDynamicPruningSubquery(
       left: LogicalPlan,
       right: LogicalPlan,
       leftKey: Expression,
@@ -296,7 +247,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
     }
   }
 
-  def hasBloomFilter(
+  private def hasBloomFilter(
       left: LogicalPlan,
       right: LogicalPlan,
       leftKey: Expression,
@@ -316,19 +267,6 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
     }
   }
 
-  def hasInSubquery(left: LogicalPlan, right: LogicalPlan, leftKey: Expression,
-      rightKey: Expression): Boolean = {
-    (left, right) match {
-      case (Filter(InSubquery(Seq(key),
-      ListQuery(Aggregate(Seq(Alias(_, _)), Seq(Alias(_, _)), _), _, _, _, _, 
_)), _), _) =>
-        key.fastEquals(leftKey) || key.fastEquals(new 
Murmur3Hash(Seq(leftKey)))
-      case (_, Filter(InSubquery(Seq(key),
-      ListQuery(Aggregate(Seq(Alias(_, _)), Seq(Alias(_, _)), _), _, _, _, _, 
_)), _)) =>
-        key.fastEquals(rightKey) || key.fastEquals(new 
Murmur3Hash(Seq(rightKey)))
-      case _ => false
-    }
-  }
-
   private def tryInjectRuntimeFilter(plan: LogicalPlan): LogicalPlan = {
     var filterCounter = 0
     val numFilterThreshold = 
conf.getConf(SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD)
@@ -339,11 +277,11 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
         leftKeys.lazyZip(rightKeys).foreach((l, r) => {
           // Check if:
           // 1. There is already a DPP filter on the key
-          // 2. There is already a runtime filter (Bloom filter or IN 
subquery) on the key
+          // 2. There is already a bloom filter on the key
           // 3. The keys are simple cheap expressions
           if (filterCounter < numFilterThreshold &&
             !hasDynamicPruningSubquery(left, right, l, r) &&
-            !hasRuntimeFilter(newLeft, newRight, l, r) &&
+            !hasBloomFilter(newLeft, newRight, l, r) &&
             isSimpleExpression(l) && isSimpleExpression(r)) {
             val oldLeft = newLeft
             val oldRight = newRight
@@ -378,15 +316,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan match {
     case s: Subquery if s.correlated => plan
-    case _ if !conf.runtimeFilterSemiJoinReductionEnabled &&
-      !conf.runtimeFilterBloomFilterEnabled => plan
-    case _ =>
-      val newPlan = tryInjectRuntimeFilter(plan)
-      if (conf.runtimeFilterSemiJoinReductionEnabled && 
!plan.fastEquals(newPlan)) {
-        RewritePredicateSubquery(newPlan)
-      } else {
-        newPlan
-      }
+    case _ if !conf.runtimeFilterBloomFilterEnabled => plan
+    case _ => tryInjectRuntimeFilter(plan)
   }
 
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 43dc541fbb9..1f37363648a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -369,14 +369,6 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED =
-    buildConf("spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled")
-      .doc("When true and if one side of a shuffle join has a selective 
predicate, we attempt " +
-        "to insert a semi join in the other side to reduce the amount of 
shuffle data.")
-      .version("3.3.0")
-      .booleanConf
-      .createWithDefault(false)
-
   val RUNTIME_FILTER_NUMBER_THRESHOLD =
     buildConf("spark.sql.optimizer.runtimeFilter.number.threshold")
       .doc("The total number of injected runtime filters (non-DPP) for a 
single " +
@@ -4639,7 +4631,9 @@ object SQLConf {
           "returns null when getting a map value with a non-existing key. See 
SPARK-40066 " +
           "for more details."),
       RemovedConfig("spark.sql.hive.verifyPartitionPath", "4.0.0", "false",
-        s"This config was replaced by '${IGNORE_MISSING_FILES.key}'.")
+        s"This config was replaced by '${IGNORE_MISSING_FILES.key}'."),
+      
RemovedConfig("spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled", 
"false", "4.0",
+        "This optimizer config is useless as runtime filter cannot be an IN 
subquery now.")
     )
 
     Map(configs.map { cfg => cfg.key -> cfg } : _*)
@@ -4692,9 +4686,6 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def dynamicPartitionPruningReuseBroadcastOnly: Boolean =
     getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY)
 
-  def runtimeFilterSemiJoinReductionEnabled: Boolean =
-    getConf(RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED)
-
   def runtimeFilterBloomFilterEnabled: Boolean =
     getConf(RUNTIME_BLOOM_FILTER_ENABLED)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index c46e0bfcecb..2e57975ee6d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -19,9 +19,8 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.expressions.{Alias, 
BloomFilterMightContain, Literal}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
BloomFilterAggregate}
-import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, 
MergeScalarSubqueries}
-import org.apache.spark.sql.catalyst.plans.LeftSemi
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.optimizer.MergeScalarSubqueries
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LogicalPlan}
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec}
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
AQEPropagateEmptyRelation}
 import org.apache.spark.sql.internal.SQLConf
@@ -225,58 +224,28 @@ class InjectRuntimeFilterSuite extends QueryTest with 
SQLTestUtils with SharedSp
     super.afterAll()
   }
 
-  private def ensureLeftSemiJoinExists(plan: LogicalPlan): Unit = {
-    assert(
-      plan.find {
-        case j: Join if j.joinType == LeftSemi => true
-        case _ => false
-      }.isDefined
-    )
-  }
-
-  def checkWithAndWithoutFeatureEnabled(query: String, testSemiJoin: Boolean,
-      shouldReplace: Boolean, runtimeFilterNum: Int = 1): Unit = {
+  def checkWithAndWithoutFeatureEnabled(
+      query: String,
+      shouldReplace: Boolean,
+      runtimeFilterNum: Int = 1): Unit = {
     var planDisabled: LogicalPlan = null
     var planEnabled: LogicalPlan = null
     var expectedAnswer: Array[Row] = null
 
-    withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> 
"false",
-      SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
+    withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
       planDisabled = sql(query).queryExecution.optimizedPlan
       expectedAnswer = sql(query).collect()
     }
 
-    if (testSemiJoin) {
-      withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> 
"true",
-        SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
-        planEnabled = sql(query).queryExecution.optimizedPlan
-        checkAnswer(sql(query), expectedAnswer)
-      }
+    withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
+      planEnabled = sql(query).queryExecution.optimizedPlan
+      checkAnswer(sql(query), expectedAnswer)
+      assert(getNumBloomFilters(planDisabled) == 0)
       if (shouldReplace) {
-        val normalizedEnabled = normalizePlan(normalizeExprIds(planEnabled))
-        val normalizedDisabled = normalizePlan(normalizeExprIds(planDisabled))
-        ensureLeftSemiJoinExists(planEnabled)
-        assert(normalizedEnabled != normalizedDisabled)
-        val agg = planEnabled.collect {
-          case Join(_, agg: Aggregate, LeftSemi, _, _) => agg
-        }
-        assert(agg.size == runtimeFilterNum)
-        assert(agg.head.fastEquals(ColumnPruning(agg.head)))
+        assert(!columnPruningTakesEffect(planEnabled))
+        assert(getNumBloomFilters(planEnabled) == runtimeFilterNum)
       } else {
-        comparePlans(planDisabled, planEnabled)
-      }
-    } else {
-      withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> 
"false",
-        SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
-        planEnabled = sql(query).queryExecution.optimizedPlan
-        checkAnswer(sql(query), expectedAnswer)
-        assert(getNumBloomFilters(planDisabled) == 0)
-        if (shouldReplace) {
-          assert(!columnPruningTakesEffect(planEnabled))
-          assert(getNumBloomFilters(planEnabled) == runtimeFilterNum)
-        } else {
-          assert(getNumBloomFilters(planEnabled) == 
getNumBloomFilters(planDisabled))
-        }
+        assert(getNumBloomFilters(planEnabled) == 
getNumBloomFilters(planDisabled))
       }
     }
   }
@@ -320,62 +289,12 @@ class InjectRuntimeFilterSuite extends QueryTest with 
SQLTestUtils with SharedSp
     }.nonEmpty
   }
 
-  def assertRewroteSemiJoin(query: String, runtimeFilterNum: Int = 1): Unit = {
-    checkWithAndWithoutFeatureEnabled(
-      query, testSemiJoin = true, shouldReplace = true, runtimeFilterNum)
-  }
-
-  def assertDidNotRewriteSemiJoin(query: String): Unit = {
-    checkWithAndWithoutFeatureEnabled(query, testSemiJoin = true, 
shouldReplace = false)
-  }
-
   def assertRewroteWithBloomFilter(query: String, runtimeFilterNum: Int = 1): 
Unit = {
-    checkWithAndWithoutFeatureEnabled(
-      query, testSemiJoin = false, shouldReplace = true, runtimeFilterNum)
+    checkWithAndWithoutFeatureEnabled(query, shouldReplace = true, 
runtimeFilterNum)
   }
 
   def assertDidNotRewriteWithBloomFilter(query: String): Unit = {
-    checkWithAndWithoutFeatureEnabled(query, testSemiJoin = false, 
shouldReplace = false)
-  }
-
-  test("Runtime semi join reduction: simple") {
-    // Filter creation side is 3409 bytes
-    // Filter application side scan is 3362 bytes
-    
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
 -> "3000",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
-      assertRewroteSemiJoin("select * from bf1 join bf2 on bf1.c1 = bf2.c2 
where bf2.a2 = 62")
-      assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on bf1.c1 = 
bf2.c2")
-    }
-  }
-
-  test("Runtime semi join reduction: two joins") {
-    
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
 -> "3000",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
-      assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 on bf1.c1 = 
bf2.c2 " +
-        "and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
-    }
-  }
-
-  test("Runtime semi join reduction: three joins") {
-    
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
 -> "3000",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
-      assertRewroteSemiJoin("select * from bf1 join bf2 join bf3 join bf4 on " 
+
-        "bf1.c1 = bf2.c2 and bf2.c2 = bf3.c3 and bf3.c3 = bf4.c4 where bf1.a1 
= 5", 3)
-    }
-  }
-
-  test("Runtime semi join reduction: simple expressions only") {
-    
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
 -> "3000",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
-      val squared = (s: Long) => {
-        s * s
-      }
-      spark.udf.register("square", squared)
-      assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " +
-        "bf1.c1 = bf2.c2 where square(bf2.a2) = 62")
-      assertDidNotRewriteSemiJoin("select * from bf1 join bf2 on " +
-        "bf1.c1 = square(bf2.c2) where bf2.a2= 62")
-    }
+    checkWithAndWithoutFeatureEnabled(query, shouldReplace = false)
   }
 
   test("Runtime bloom filter join: simple") {
@@ -457,14 +376,12 @@ class InjectRuntimeFilterSuite extends QueryTest with 
SQLTestUtils with SharedSp
       val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
         "bf1.b1 = bf2.b2 where bf2.a2 = 62"
 
-      withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> 
"false",
-        SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
+      withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
         planDisabled = sql(query).queryExecution.optimizedPlan
         expectedAnswer = sql(query).collect()
       }
 
-      withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> 
"false",
-        SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
+      withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
         planEnabled = sql(query).queryExecution.optimizedPlan
         checkAnswer(sql(query), expectedAnswer)
       }
@@ -482,15 +399,13 @@ class InjectRuntimeFilterSuite extends QueryTest with 
SQLTestUtils with SharedSp
       val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
         "bf1.b1 = bf2.b2 where bf2.a2 = 62"
 
-      withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> 
"false",
-        SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
+      withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
         planDisabled = sql(query).queryExecution.optimizedPlan
         expectedAnswer = sql(query).collect()
       }
 
       for (numFilterThreshold <- 0 to 3) {
-        withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> 
"false",
-          SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
+        withSQLConf( SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
           SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD.key -> 
numFilterThreshold.toString) {
           planEnabled = sql(query).queryExecution.optimizedPlan
           checkAnswer(sql(query), expectedAnswer)
@@ -515,14 +430,12 @@ class InjectRuntimeFilterSuite extends QueryTest with 
SQLTestUtils with SharedSp
       val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
         "bf1.c1 = bf2.b2 where bf2.a2 = 62"
 
-      withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> 
"false",
-        SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
+      withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "false") {
         planDisabled = sql(query).queryExecution.optimizedPlan
         expectedAnswer = sql(query).collect()
       }
 
-      withSQLConf(SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> 
"false",
-        SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
+      withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true") {
         planEnabled = sql(query).queryExecution.optimizedPlan
         checkAnswer(sql(query), expectedAnswer)
       }
@@ -648,7 +561,6 @@ class InjectRuntimeFilterSuite extends QueryTest with 
SQLTestUtils with SharedSp
   test("Merge runtime bloom filters") {
     
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
 -> "3000",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000",
-      SQLConf.RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED.key -> "false",
       SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
       // Re-enable `MergeScalarSubqueries`
       SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to