Copilot commented on code in PR #12151: URL: https://github.com/apache/gluten/pull/12151#discussion_r3397577984
########## backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainFallbackPatcher.scala: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.expression.VeloxBloomFilterMightContain +import org.apache.gluten.extension.columnar.heuristic.FallbackNode + +import org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{FilterExec, SparkPlan} + +/** + * Fallback policy rule that patches `BloomFilterMightContain` -> `VeloxBloomFilterMightContain` + * inside whole-stage fallback plans when the Velox backend requires joint fallback. + * + * When [[org.apache.gluten.extension.columnar.heuristic.ExpandFallbackPolicy]] triggers a + * whole-stage fallback it returns the original vanilla Spark plan (containing vanilla + * `BloomFilterMightContain`) wrapped in a `FallbackNode`. If the bloom-filter producer (Stage 0) + * already ran natively it produced bytes in Velox's serialization format, which is incompatible + * with `BloomFilterImpl.readFrom()`. This rule replaces the vanilla expression with + * `VeloxBloomFilterMightContain`, which reads Velox-format bytes via JNI, so the JVM filter stage + * can execute correctly after falling back. + * + * This rule runs as a second fallback-policy pass, after `ExpandFallbackPolicy`, so it only acts + * when the plan is already wrapped in a `FallbackNode`. + */ +case class BloomFilterMightContainFallbackPatcher() extends Rule[SparkPlan] { + + override def apply(plan: SparkPlan): SparkPlan = { + if (!BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) { + return plan + } + plan match { Review Comment: BloomFilterMightContainFallbackPatcher rewrites Spark's BloomFilterMightContain to VeloxBloomFilterMightContain unconditionally (when joint fallback is required). If native bloom filter is disabled (spark.gluten.sql.native.bloomFilter=false), Stage 0 will produce Spark-format bloom filter bytes, and rewriting the filter stage to VeloxBloomFilterMightContain can make it interpret Spark bytes as Velox bytes and fail/return incorrect results. The patcher should be a no-op unless native bloom filter is enabled (consistent with BloomFilterMightContainJointRewriteRule’s guard). ########## gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala: ########## @@ -112,6 +122,64 @@ class GlutenBloomFilterAggregateQuerySuite } } + // Regression test for https://github.com/apache/gluten/issues/12013 + // When ExpandFallbackPolicy triggers a whole-stage AQE fallback, the resulting plan comes + // from the original vanilla Spark plan which contains BloomFilterMightContain (not the Velox + // variant). If Stage 0 (bloom_filter_agg subquery) already ran natively it produced Velox- + // format bytes, which BloomFilterImpl.readFrom() cannot deserialize. BloomFilterMightContain- + // FallbackPatcher patches the fallback plan to use VeloxBloomFilterMightContain so Stage 1 + // can read Velox bytes via JNI even after falling back to JVM. + testGluten( + "Test bloom_filter_agg whole-stage fallback does not corrupt bloom filter bytes", + Issue12013) { + val table = "bloom_filter_test" + val numEstimatedItems = 5000000L + val sqlString = + s""" + |SELECT col positive_membership_test + |FROM $table + |WHERE might_contain( + | (SELECT bloom_filter_agg(col, + | cast($numEstimatedItems as long), + | cast($veloxBloomFilterMaxNumBits as long)) + | FROM $table), col) + |""".stripMargin + + withTempView(table) { + (Seq(Long.MinValue, 0, Long.MaxValue) ++ (1L to 200000L)) + .toDF("col") + .createOrReplaceTempView(table) + if (BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) { + // Disable columnar filter so FilterExec falls back, and set the whole-stage fallback + // threshold so ExpandFallbackPolicy promotes the individual fallback to whole-stage. + // This reproduces the scenario where the filter stage falls back to the original + // vanilla plan while the bloom_filter_agg subquery has already produced Velox-format + // bloom filter bytes. + // + // Threshold=2: a fallen-back FilterExec introduces two ColumnarToRow/RowToColumnar + // transitions (net transition cost=2), which meets the threshold and triggers the + // whole-stage AQE fallback. The bloom_filter_agg subquery stages have an inherent + // transition cost of 1, so they do NOT trigger the fallback and run natively. + // + // ANSI mode must be off: Spark 4.0 enables ANSI by default, which causes + // ObjectHashAggregateExec to fail Gluten validation ("does not support ansi mode"), + // raising the agg-stage transition cost above 1. With ANSI off the agg-stage cost + // stays at 1 (< threshold 2), so only the filter stage falls back as intended. + withSQLConf( + GlutenConfig.COLUMNAR_FILTER_ENABLED.key -> "false", + GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "2", + SQLConf.ANSI_ENABLED.key -> "false" + ) { + val df = spark.sql(sqlString) + // Must not throw java.io.IOException: Unexpected Bloom filter version number (16777217) + df.collect + // All 200003 rows match the bloom filter built from the same data. + assert(df.count() == 200003L) Review Comment: This test runs two separate actions (collect then count), which re-executes the query and makes the suite noticeably slower. You can assert the expected row count from the collected result to keep the same failure signal (IOException during execution) while running only one job. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
