This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a54a803768 [GLUTEN-11088][VL] fix `bloomFilter` in
GlutenDataFrameStatSuite (#11211)
a54a803768 is described below
commit a54a803768cda3e5bd559825da9fb69cd23c8b92
Author: Rong Ma <[email protected]>
AuthorDate: Fri Nov 28 09:38:26 2025 +0000
[GLUTEN-11088][VL] fix `bloomFilter` in GlutenDataFrameStatSuite (#11211)
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 12 +++++++++--
.../BloomFilterMightContainJointRewriteRule.scala | 7 +++++--
.../gluten/extension/caller/CallerInfo.scala | 23 +++++++++++++++++-----
.../gluten/utils/velox/VeloxTestSettings.scala | 2 --
4 files changed, 33 insertions(+), 11 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 496f18811c..7a78bb8468 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -84,7 +84,11 @@ object VeloxRuleApi {
injector.injectPreTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectPreTransform(c =>
MergeTwoPhasesHashBaseAggregate(c.session))
injector.injectPreTransform(_ => RewriteSubqueryBroadcast())
- injector.injectPreTransform(c =>
BloomFilterMightContainJointRewriteRule.apply(c.session))
+ injector.injectPreTransform(
+ c =>
+ BloomFilterMightContainJointRewriteRule.apply(
+ c.session,
+ c.caller.isBloomFilterStatFunction()))
injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session))
injector.injectPreTransform(_ => EliminateRedundantGetTimestamp)
@@ -162,7 +166,11 @@ object VeloxRuleApi {
injector.injectPreTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectPreTransform(c =>
MergeTwoPhasesHashBaseAggregate(c.session))
injector.injectPreTransform(_ => RewriteSubqueryBroadcast())
- injector.injectPreTransform(c =>
BloomFilterMightContainJointRewriteRule.apply(c.session))
+ injector.injectPreTransform(
+ c =>
+ BloomFilterMightContainJointRewriteRule.apply(
+ c.session,
+ c.caller.isBloomFilterStatFunction()))
injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session))
injector.injectPreTransform(_ => EliminateRedundantGetTimestamp)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
index 7b209ad605..9b743a4f22 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala
@@ -25,9 +25,12 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
-case class BloomFilterMightContainJointRewriteRule(spark: SparkSession)
extends Rule[SparkPlan] {
+case class BloomFilterMightContainJointRewriteRule(
+ spark: SparkSession,
+ isBloomFilterStatFunction: Boolean)
+ extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
- if (!GlutenConfig.get.enableNativeBloomFilter) {
+ if (isBloomFilterStatFunction ||
!GlutenConfig.get.enableNativeBloomFilter) {
return plan
}
val out = plan.transformWithSubqueries {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
index 1307351a43..732c898285 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
@@ -30,6 +30,7 @@ trait CallerInfo {
def isAqe(): Boolean
def isCache(): Boolean
def isStreaming(): Boolean
+ def isBloomFilterStatFunction(): Boolean
}
object CallerInfo {
@@ -41,7 +42,8 @@ object CallerInfo {
private class Impl(
override val isAqe: Boolean,
override val isCache: Boolean,
- override val isStreaming: Boolean
+ override val isStreaming: Boolean,
+ override val isBloomFilterStatFunction: Boolean
) extends CallerInfo
/*
@@ -55,7 +57,8 @@ object CallerInfo {
new Impl(
isAqe = inAqeCall(stack),
isCache = inCacheCall(stack),
- isStreaming = inStreamingCall(stack))
+ isStreaming = inStreamingCall(stack),
+ isBloomFilterStatFunction = inBloomFilterStatFunctionCall(stack))
}
private def inAqeCall(stack: Seq[StackTraceElement]): Boolean = {
@@ -70,11 +73,21 @@ object CallerInfo {
stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head))
}
+ private def inBloomFilterStatFunctionCall(stack: Seq[StackTraceElement]):
Boolean = {
+ val res = stack.exists(
+ _.getClassName.equals("org.apache.spark.sql.DataFrameStatFunctions")
+ && stack.exists(_.getMethodName.equals("bloomFilter")))
+ res
+ }
+
/** For testing only. */
- def withLocalValue[T](isAqe: Boolean, isCache: Boolean, isStreaming: Boolean
= false)(
- body: => T): T = {
+ def withLocalValue[T](
+ isAqe: Boolean,
+ isCache: Boolean,
+ isStreaming: Boolean = false,
+ isBloomFilterStatFunction: Boolean = false)(body: => T): T = {
val prevValue = localStorage.get()
- val newValue = new Impl(isAqe, isCache, isStreaming)
+ val newValue = new Impl(isAqe, isCache, isStreaming,
isBloomFilterStatFunction)
localStorage.set(Some(newValue))
try {
body
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 4498b7ebf3..1383beae79 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -787,8 +787,6 @@ class VeloxTestSettings extends BackendTestSettings {
// Not really an issue.
.exclude("SPARK-10740: handle nondeterministic expressions correctly for
set operations")
enableSuite[GlutenDataFrameStatSuite]
- // TODO: fix in Spark-4.0
- .exclude("Bloom filter")
enableSuite[GlutenDataFrameSuite]
// Rewrite these tests because it checks Spark's physical operators.
.excludeByPrefix("SPARK-22520", "reuse exchange")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]