This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new e54dc43e750 [SPARK-38565][SQL] Support Left Semi join in row level 
runtime filters
e54dc43e750 is described below

commit e54dc43e750be23062422ca096d1e8439178a1d1
Author: Yuming Wang <[email protected]>
AuthorDate: Mon Apr 11 15:43:41 2022 +0800

    [SPARK-38565][SQL] Support Left Semi join in row level runtime filters
    
    ### What changes were proposed in this pull request?
    
    1. Support Left Semi join in row level runtime filters.
    2. Rename 
`spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizethreshold` to 
`spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold`.
    
    ### Why are the changes needed?
    
    Improve query performance and make the code easier to maintain.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UT.
    
    Closes #36131 from wangyum/SPARK-38565.
    
    Authored-by: Yuming Wang <[email protected]>
    Signed-off-by: Yuming Wang <[email protected]>
    (cherry picked from commit 073fd2ad5c16d193725954e76ce357e4a9d97449)
    Signed-off-by: Yuming Wang <[email protected]>
---
 .../catalyst/optimizer/InjectRuntimeFilter.scala   | 29 +++++++---------------
 .../spark/sql/catalyst/optimizer/joins.scala       | 10 ++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    |  2 +-
 .../dynamicpruning/PartitionPruning.scala          | 14 ++---------
 .../spark/sql/InjectRuntimeFilterSuite.scala       | 13 ++++++++++
 5 files changed, 35 insertions(+), 33 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index a69cda25ef4..134292ae30d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
BloomFilterAggregate, Complete}
 import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
PhysicalOperation}
-import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{INVOKE, 
JSON_TO_STRUCT, LIKE_FAMLIY, PYTHON_UDF, REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE, 
SCALA_UDF}
@@ -132,16 +131,6 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
       REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
   }
 
-  private def canFilterLeft(joinType: JoinType): Boolean = joinType match {
-    case Inner | RightOuter => true
-    case _ => false
-  }
-
-  private def canFilterRight(joinType: JoinType): Boolean = joinType match {
-    case Inner | LeftOuter => true
-    case _ => false
-  }
-
   private def isProbablyShuffleJoin(left: LogicalPlan,
       right: LogicalPlan, hint: JoinHint): Boolean = {
     !hintToBroadcastLeft(hint) && !hintToBroadcastRight(hint) &&
@@ -149,11 +138,11 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
   }
 
   private def probablyHasShuffle(plan: LogicalPlan): Boolean = {
-    plan.collectFirst {
-      case j@Join(left, right, _, _, hint)
-        if isProbablyShuffleJoin(left, right, hint) => j
-      case a: Aggregate => a
-    }.nonEmpty
+    plan.exists {
+      case Join(left, right, _, _, hint) => isProbablyShuffleJoin(left, right, 
hint)
+      case _: Aggregate => true
+      case _ => false
+    }
   }
 
   // Returns the max scan byte size in the subtree rooted at 
`filterApplicationSide`.
@@ -235,7 +224,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
   }
 
   private def findBloomFilterWithExp(plan: LogicalPlan, key: Expression): 
Boolean = {
-    plan.find {
+    plan.exists {
       case Filter(condition, _) =>
         splitConjunctivePredicates(condition).exists {
           case BloomFilterMightContain(_, XxHash64(Seq(valueExpression), _))
@@ -243,7 +232,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
           case _ => false
         }
       case _ => false
-    }.isDefined
+    }
   }
 
   def hasInSubquery(left: LogicalPlan, right: LogicalPlan, leftKey: Expression,
@@ -277,11 +266,11 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
             isSimpleExpression(l) && isSimpleExpression(r)) {
             val oldLeft = newLeft
             val oldRight = newRight
-            if (canFilterLeft(joinType) && filteringHasBenefit(left, right, l, 
hint)) {
+            if (canPruneLeft(joinType) && filteringHasBenefit(left, right, l, 
hint)) {
               newLeft = injectFilter(l, newLeft, r, right)
             }
             // Did we actually inject on the left? If not, try on the right
-            if (newLeft.fastEquals(oldLeft) && canFilterRight(joinType) &&
+            if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
               filteringHasBenefit(right, left, r, hint)) {
               newRight = injectFilter(r, newRight, l, left)
             }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 6d683a7a113..45d8c54ea19 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -347,6 +347,16 @@ trait JoinSelectionHelper {
         join.hint, hintOnly = false, conf).isDefined
   }
 
+  def canPruneLeft(joinType: JoinType): Boolean = joinType match {
+    case Inner | LeftSemi | RightOuter => true
+    case _ => false
+  }
+
+  def canPruneRight(joinType: JoinType): Boolean = joinType match {
+    case Inner | LeftSemi | LeftOuter => true
+    case _ => false
+  }
+
   def hintToBroadcastLeft(hint: JoinHint): Boolean = {
     hint.leftHint.exists(_.strategy.contains(BROADCAST))
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c4ffc844135..365a9a378cb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -375,7 +375,7 @@ object SQLConf {
       .createWithDefaultString("10MB")
 
   val RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD =
-    
buildConf("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizethreshold")
+    
buildConf("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold")
       .doc("Byte size threshold of the Bloom filter application side plan's 
aggregated scan " +
         "size. Aggregated scan byte size of the Bloom filter application side 
needs to be over " +
         "this value to inject a bloom filter.")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
index 114d58c739e..402c59bc3de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.dynamicpruning
 
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.JoinSelectionHelper
 import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
-import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
@@ -49,7 +49,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
  *    subquery query twice, we keep the duplicated subquery
  *    (3) otherwise, we drop the subquery.
  */
-object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
+object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with 
JoinSelectionHelper {
 
   /**
    * Searches for a table scan that can be filtered for a given column in a 
logical plan.
@@ -215,16 +215,6 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper {
     !plan.isStreaming && hasSelectivePredicate(plan)
   }
 
-  private def canPruneLeft(joinType: JoinType): Boolean = joinType match {
-    case Inner | LeftSemi | RightOuter => true
-    case _ => false
-  }
-
-  private def canPruneRight(joinType: JoinType): Boolean = joinType match {
-    case Inner | LeftSemi | LeftOuter => true
-    case _ => false
-  }
-
   private def prune(plan: LogicalPlan): LogicalPlan = {
     plan transformUp {
       // skip this rule if there's already a DPP subquery on the LHS of a join
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index 097a18cabd5..726fa341b5c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -526,4 +526,17 @@ class InjectRuntimeFilterSuite extends QueryTest with 
SQLTestUtils with SharedSp
         "bf1.c1 = square(bf2.c2) where bf2.a2 = 62" )
     }
   }
+
+  test("Support Left Semi join in row level runtime filters") {
+    
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
 -> "3000",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "32") {
+      assertRewroteWithBloomFilter(
+        """
+          |SELECT *
+          |FROM   bf1 LEFT SEMI
+          |JOIN   (SELECT * FROM bf2 WHERE bf2.a2 = 62) tmp
+          |ON     bf1.c1 = tmp.c2
+        """.stripMargin)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to