This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a04248dc [VL] Bloom-filter expressions are unexpectedly fallen back 
(#5579)
6a04248dc is described below

commit 6a04248dc3d7f6660a6a4b0248908516c6aef53a
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Apr 30 08:12:05 2024 +0800

    [VL] Bloom-filter expressions are unexpectedly fallen back (#5579)
---
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  | 14 ++++++-------
 .../BloomFilterMightContainJointRewriteRule.scala  | 24 +++++++++++++---------
 2 files changed, 20 insertions(+), 18 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 8318ac2d5..a55aa1817 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -66,7 +66,6 @@ import javax.ws.rs.core.UriBuilder
 import java.lang.{Long => JLong}
 import java.util.{Map => JMap}
 
-import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 class VeloxSparkPlanExecApi extends SparkPlanExecApi {
@@ -734,19 +733,18 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
    *
    * @return
    */
-  override def genExtendedOptimizers(): List[SparkSession => 
Rule[LogicalPlan]] = {
-    val buf = mutable.ListBuffer[SparkSession => Rule[LogicalPlan]]()
-    buf += AggregateFunctionRewriteRule.apply
-    buf += BloomFilterMightContainJointRewriteRule.apply
-    buf.toList
-  }
+  override def genExtendedOptimizers(): List[SparkSession => 
Rule[LogicalPlan]] = List(
+    AggregateFunctionRewriteRule.apply
+  )
 
   /**
    * Generate extended columnar pre-rules, in the validation phase.
    *
    * @return
    */
-  override def genExtendedColumnarValidationRules(): List[SparkSession => 
Rule[SparkPlan]] = List()
+  override def genExtendedColumnarValidationRules(): List[SparkSession => 
Rule[SparkPlan]] = List(
+    BloomFilterMightContainJointRewriteRule.apply
+  )
 
   /**
    * Generate extended columnar pre-rules.
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
index c8cb4cca3..deba381db 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
@@ -22,24 +22,28 @@ import 
org.apache.gluten.expression.aggregate.VeloxBloomFilterAggregate
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
 
-case class BloomFilterMightContainJointRewriteRule(spark: SparkSession) 
extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
+case class BloomFilterMightContainJointRewriteRule(spark: SparkSession) 
extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = {
     if (!(GlutenConfig.getConf.enableNativeBloomFilter)) {
       return plan
     }
     val out = plan.transformWithSubqueries {
       case p =>
-        p.transformExpressions {
-          case e =>
-            SparkShimLoader.getSparkShims.replaceMightContain(
-              SparkShimLoader.getSparkShims
-                .replaceBloomFilterAggregate(e, 
VeloxBloomFilterAggregate.apply),
-              VeloxBloomFilterMightContain.apply)
-        }
+        applyForNode(p)
     }
     out
   }
+
+  private def applyForNode(p: SparkPlan) = {
+    p.transformExpressions {
+      case e =>
+        SparkShimLoader.getSparkShims.replaceMightContain(
+          SparkShimLoader.getSparkShims
+            .replaceBloomFilterAggregate(e, VeloxBloomFilterAggregate.apply),
+          VeloxBloomFilterMightContain.apply)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to