This is an automated email from the ASF dual-hosted git repository.
wangzhen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 903858d519 [GLUTEN-9849][VL] Avoid VeloxBloomFilterMightContain being
applied to FileSourceScan partition filters (#9850)
903858d519 is described below
commit 903858d51982411bc7ea3c79804d14e138474600
Author: Zhen Wang <[email protected]>
AuthorDate: Mon Jun 30 10:05:38 2025 +0800
[GLUTEN-9849][VL] Avoid VeloxBloomFilterMightContain being applied to
FileSourceScan partition filters (#9850)
* [GLUTEN-9849][VL] Avoid VeloxBloomFilterMightContain being applied to
FileSourceScan partition filters
* test
* test
* fix
* add comment
---
.../expression/VeloxBloomFilterMightContain.scala | 16 +++++++++++++++-
.../apache/gluten/execution/GlutenExpression.scala | 20 ++++++++++++++++----
.../gluten/execution/ScanTransformerFactory.scala | 8 +++++++-
.../utils/clickhouse/ClickHouseTestSettings.scala | 1 +
.../spark/sql/GlutenInjectRuntimeFilterSuite.scala | 20 +++++++++++++++++---
5 files changed, 56 insertions(+), 9 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala
b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala
index 05e1e3eb48..3821fd001c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.expression
+import org.apache.gluten.execution.GlutenTaskOnlyExpression
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.VeloxBloomFilter
@@ -31,11 +32,15 @@ import org.apache.spark.task.TaskResources
* Velox's bloom-filter implementation uses different algorithms internally
comparing to vanilla
* Spark so produces different intermediate aggregate data. Thus we use
different filter function /
* agg function types for Velox's version to distinguish from vanilla Spark's
implementation.
+ *
+ * FIXME: Remove GlutenTaskOnlyExpression after the VeloxBloomFilter expr is
made compatible with
+ * spark. See:
https://github.com/apache/incubator-gluten/pull/9850#issuecomment-3007448538
*/
case class VeloxBloomFilterMightContain(
bloomFilterExpression: Expression,
valueExpression: Expression)
- extends BinaryExpression {
+ extends BinaryExpression
+ with GlutenTaskOnlyExpression {
private val delegate =
SparkShimLoader.getSparkShims.newMightContain(bloomFilterExpression,
valueExpression)
@@ -89,6 +94,7 @@ case class VeloxBloomFilterMightContain(
val valueEval = valueExpression.genCode(ctx)
val code =
code"""
+
org.apache.gluten.expression.VeloxBloomFilterMightContain.checkInSparkTask();
${valueEval.code}
boolean ${ev.isNull} = ${valueEval.isNull};
${CodeGenerator.javaType(dataType)} ${ev.value} =
${CodeGenerator.defaultValue(dataType)};
@@ -98,3 +104,11 @@ case class VeloxBloomFilterMightContain(
ev.copy(code = code)
}
}
+
+object VeloxBloomFilterMightContain {
+ def checkInSparkTask(): Unit = {
+ if (!TaskResources.inSparkTask()) {
+ throw new UnsupportedOperationException("velox_might_contain is not
evaluable on Driver")
+ }
+ }
+}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala
similarity index 65%
copy from
gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala
copy to
gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala
index 11b6d99828..7ea8b558ca 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala
@@ -14,8 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.gluten.execution
-class GlutenInjectRuntimeFilterSuite
- extends InjectRuntimeFilterSuite
- with GlutenSQLTestsBaseTrait {}
+/**
+ * GlutenExpression is a marker trait for expressions that are specific to
Gluten execution. It can
+ * be used to identify expressions that should only be evaluated in the
context of a Spark task.
+ */
+trait GlutenExpression {
+
+ def onlyInSparkTask(): Boolean = false
+
+}
+
+trait GlutenTaskOnlyExpression extends GlutenExpression {
+
+ override def onlyInSparkTask(): Boolean = true
+
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index 2a8cc91382..bcf209c8ae 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -25,11 +25,17 @@ object ScanTransformerFactory {
def createFileSourceScanTransformer(
scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = {
+ // Partition filters will be evaluated in driver side, so we can remove
+ // GlutenTaskOnlyExpressions
+ val partitionFilters = scanExec.partitionFilters.filter {
+ case _: GlutenTaskOnlyExpression => false
+ case _ => true
+ }
FileSourceScanExecTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
- scanExec.partitionFilters,
+ partitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters,
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 9026fefffd..0860b5e3e7 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -901,6 +901,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenInjectRuntimeFilterSuite]
// FIXME: yan
.includeCH("Merge runtime bloom filters")
+ .excludeGlutenTest("GLUTEN-9849: bloom filter applied to partition filter")
enableSuite[GlutenInnerJoinSuiteForceShjOff]
.excludeCH(
"inner join, one match per row using ShuffledHashJoin (build=left)
(whole-stage-codegen off)")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala
index 11b6d99828..26e11f8438 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala
@@ -16,6 +16,20 @@
*/
package org.apache.spark.sql
-class GlutenInjectRuntimeFilterSuite
- extends InjectRuntimeFilterSuite
- with GlutenSQLTestsBaseTrait {}
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenInjectRuntimeFilterSuite extends InjectRuntimeFilterSuite with
GlutenSQLTestsBaseTrait {
+
+ testGluten("GLUTEN-9849: bloom filter applied to partition filter") {
+ withSQLConf(
+ SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key ->
"3000",
+ SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000"
+ ) {
+ assertRewroteWithBloomFilter(
+ "select * from bf5part join bf2 on " +
+ "bf5part.f5 = bf2.c2 where bf2.a2 = 67")
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]