Copilot commented on code in PR #12151:
URL: https://github.com/apache/gluten/pull/12151#discussion_r3397577984


##########
backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainFallbackPatcher.scala:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.VeloxBloomFilterMightContain
+import org.apache.gluten.extension.columnar.heuristic.FallbackNode
+
+import org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FilterExec, SparkPlan}
+
+/**
+ * Fallback policy rule that patches `BloomFilterMightContain` -> 
`VeloxBloomFilterMightContain`
+ * inside whole-stage fallback plans when the Velox backend requires joint 
fallback.
+ *
+ * When 
[[org.apache.gluten.extension.columnar.heuristic.ExpandFallbackPolicy]] 
triggers a
+ * whole-stage fallback it returns the original vanilla Spark plan (containing 
vanilla
+ * `BloomFilterMightContain`) wrapped in a `FallbackNode`. If the bloom-filter 
producer (Stage 0)
+ * already ran natively it produced bytes in Velox's serialization format, 
which is incompatible
+ * with `BloomFilterImpl.readFrom()`. This rule replaces the vanilla 
expression with
+ * `VeloxBloomFilterMightContain`, which reads Velox-format bytes via JNI, so 
the JVM filter stage
+ * can execute correctly after falling back.
+ *
+ * This rule runs as a second fallback-policy pass, after 
`ExpandFallbackPolicy`, so it only acts
+ * when the plan is already wrapped in a `FallbackNode`.
+ */
+case class BloomFilterMightContainFallbackPatcher() extends Rule[SparkPlan] {
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if 
(!BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback())
 {
+      return plan
+    }
+    plan match {

Review Comment:
   BloomFilterMightContainFallbackPatcher rewrites Spark's 
BloomFilterMightContain to VeloxBloomFilterMightContain unconditionally (when 
joint fallback is required). If native bloom filter is disabled 
(spark.gluten.sql.native.bloomFilter=false), Stage 0 will produce Spark-format 
bloom filter bytes, and rewriting the filter stage to 
VeloxBloomFilterMightContain can make it interpret Spark bytes as Velox bytes 
and fail/return incorrect results. The patcher should be a no-op unless native 
bloom filter is enabled (consistent with 
BloomFilterMightContainJointRewriteRule’s guard).



##########
gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala:
##########
@@ -112,6 +122,64 @@ class GlutenBloomFilterAggregateQuerySuite
     }
   }
 
+  // Regression test for https://github.com/apache/gluten/issues/12013
+  // When ExpandFallbackPolicy triggers a whole-stage AQE fallback, the 
resulting plan comes
+  // from the original vanilla Spark plan which contains 
BloomFilterMightContain (not the Velox
+  // variant). If Stage 0 (bloom_filter_agg subquery) already ran natively it 
produced Velox-
+  // format bytes, which BloomFilterImpl.readFrom() cannot deserialize. 
BloomFilterMightContain-
+  // FallbackPatcher patches the fallback plan to use 
VeloxBloomFilterMightContain so Stage 1
+  // can read Velox bytes via JNI even after falling back to JVM.
+  testGluten(
+    "Test bloom_filter_agg whole-stage fallback does not corrupt bloom filter 
bytes",
+    Issue12013) {
+    val table = "bloom_filter_test"
+    val numEstimatedItems = 5000000L
+    val sqlString =
+      s"""
+         |SELECT col positive_membership_test
+         |FROM $table
+         |WHERE might_contain(
+         |            (SELECT bloom_filter_agg(col,
+         |              cast($numEstimatedItems as long),
+         |              cast($veloxBloomFilterMaxNumBits as long))
+         |             FROM $table), col)
+         |""".stripMargin
+
+    withTempView(table) {
+      (Seq(Long.MinValue, 0, Long.MaxValue) ++ (1L to 200000L))
+        .toDF("col")
+        .createOrReplaceTempView(table)
+      if 
(BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback())
 {
+        // Disable columnar filter so FilterExec falls back, and set the 
whole-stage fallback
+        // threshold so ExpandFallbackPolicy promotes the individual fallback 
to whole-stage.
+        // This reproduces the scenario where the filter stage falls back to 
the original
+        // vanilla plan while the bloom_filter_agg subquery has already 
produced Velox-format
+        // bloom filter bytes.
+        //
+        // Threshold=2: a fallen-back FilterExec introduces two 
ColumnarToRow/RowToColumnar
+        // transitions (net transition cost=2), which meets the threshold and 
triggers the
+        // whole-stage AQE fallback.  The bloom_filter_agg subquery stages 
have an inherent
+        // transition cost of 1, so they do NOT trigger the fallback and run 
natively.
+        //
+        // ANSI mode must be off: Spark 4.0 enables ANSI by default, which 
causes
+        // ObjectHashAggregateExec to fail Gluten validation ("does not 
support ansi mode"),
+        // raising the agg-stage transition cost above 1.  With ANSI off the 
agg-stage cost
+        // stays at 1 (< threshold 2), so only the filter stage falls back as 
intended.
+        withSQLConf(
+          GlutenConfig.COLUMNAR_FILTER_ENABLED.key -> "false",
+          GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "2",
+          SQLConf.ANSI_ENABLED.key -> "false"
+        ) {
+          val df = spark.sql(sqlString)
+          // Must not throw java.io.IOException: Unexpected Bloom filter 
version number (16777217)
+          df.collect
+          // All 200003 rows match the bloom filter built from the same data.
+          assert(df.count() == 200003L)

Review Comment:
   This test runs two separate actions (collect then count), which re-executes 
the query and makes the suite noticeably slower. You can assert the expected 
row count from the collected result to keep the same failure signal 
(IOException during execution) while running only one job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to