This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 24ce3f965b [GLUTEN-11383][VL] Allow bloom filter pushdown in hash 
probe (#11392)
24ce3f965b is described below

commit 24ce3f965b5d60cf09a6cc86bfbdc951e6f94954
Author: inf <[email protected]>
AuthorDate: Fri Feb 6 19:26:49 2026 +0300

    [GLUTEN-11383][VL] Allow bloom filter pushdown in hash probe (#11392)
    
    Co-authored-by: infvg <[email protected]>
---
 .../java/org/apache/gluten/metrics/Metrics.java    |  4 +++
 .../org/apache/gluten/metrics/OperatorMetrics.java |  3 ++
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala |  6 ++++
 .../org/apache/gluten/config/VeloxConfig.scala     | 21 +++++++++++++
 .../apache/gluten/metrics/JoinMetricsUpdater.scala |  3 ++
 .../org/apache/gluten/metrics/MetricsUtil.scala    |  3 ++
 .../gluten/execution/VeloxHashJoinSuite.scala      | 36 ++++++++++++++++++++++
 cpp/core/jni/JniWrapper.cc                         |  3 +-
 cpp/core/utils/Metrics.h                           |  1 +
 cpp/velox/compute/WholeStageResultIterator.cc      |  8 +++++
 cpp/velox/config/VeloxConfig.h                     |  6 ++++
 docs/velox-configuration.md                        |  2 ++
 12 files changed, 95 insertions(+), 1 deletion(-)

diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
index 940daf977a..3d23dc94db 100644
--- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -42,6 +42,7 @@ public class Metrics implements IMetrics {
   public long[] numReplacedWithDynamicFilterRows;
   public long[] flushRowCount;
   public long[] loadedToValueHook;
+  public long[] bloomFilterBlocksByteSize;
   public long[] skippedSplits;
   public long[] processedSplits;
   public long[] skippedStrides;
@@ -91,6 +92,7 @@ public class Metrics implements IMetrics {
       long[] numReplacedWithDynamicFilterRows,
       long[] flushRowCount,
       long[] loadedToValueHook,
+      long[] bloomFilterBlocksByteSize,
       long[] scanTime,
       long[] skippedSplits,
       long[] processedSplits,
@@ -134,6 +136,7 @@ public class Metrics implements IMetrics {
     this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
     this.flushRowCount = flushRowCount;
     this.loadedToValueHook = loadedToValueHook;
+    this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
     this.skippedSplits = skippedSplits;
     this.processedSplits = processedSplits;
     this.skippedStrides = skippedStrides;
@@ -183,6 +186,7 @@ public class Metrics implements IMetrics {
         numReplacedWithDynamicFilterRows[index],
         flushRowCount[index],
         loadedToValueHook[index],
+        bloomFilterBlocksByteSize[index],
         scanTime[index],
         skippedSplits[index],
         processedSplits[index],
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index 0f4ab96cbc..10563e507e 100644
--- 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++ 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -40,6 +40,7 @@ public class OperatorMetrics implements IOperatorMetrics {
   public long numReplacedWithDynamicFilterRows;
   public long flushRowCount;
   public long loadedToValueHook;
+  public long bloomFilterBlocksByteSize;
   public long skippedSplits;
   public long processedSplits;
   public long skippedStrides;
@@ -84,6 +85,7 @@ public class OperatorMetrics implements IOperatorMetrics {
       long numReplacedWithDynamicFilterRows,
       long flushRowCount,
       long loadedToValueHook,
+      long bloomFilterBlocksByteSize,
       long scanTime,
       long skippedSplits,
       long processedSplits,
@@ -125,6 +127,7 @@ public class OperatorMetrics implements IOperatorMetrics {
     this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
     this.flushRowCount = flushRowCount;
     this.loadedToValueHook = loadedToValueHook;
+    this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
     this.skippedSplits = skippedSplits;
     this.processedSplits = processedSplits;
     this.skippedStrides = skippedStrides;
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 80afe9f19f..29d882c3d3 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -298,6 +298,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "loadedToValueHook" -> SQLMetrics.createMetric(
         sparkContext,
         "number of pushdown aggregations"),
+      "bloomFilterBlocksByteSize" -> SQLMetrics.createSizeMetric(
+        sparkContext,
+        "bloom filter blocks byte size"),
       "rowConstructionCpuCount" -> SQLMetrics.createMetric(
         sparkContext,
         "rowConstruction cpu wall time count"),
@@ -625,6 +628,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       "hashProbeDynamicFiltersProduced" -> SQLMetrics.createMetric(
         sparkContext,
         "number of hash probe dynamic filters produced"),
+      "bloomFilterBlocksByteSize" -> SQLMetrics.createSizeMetric(
+        sparkContext,
+        "bloom filter blocks byte size"),
       "streamPreProjectionCpuCount" -> SQLMetrics.createMetric(
         sparkContext,
         "stream preProject cpu wall time count"),
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 03781d2fb5..ee0866391c 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -85,6 +85,11 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
   def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)
 
   def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
+
+  def hashProbeBloomFilterPushdownMaxSize: Long = 
getConf(HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE)
+
+  def hashProbeDynamicFilterPushdownEnabled: Boolean =
+    getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED)
 }
 
 object VeloxConfig extends ConfigRegistry {
@@ -447,6 +452,22 @@ object VeloxConfig extends ConfigRegistry {
       .longConf
       .createWithDefault(4194304L)
 
+  val HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize")
+      .doc("The maximum byte size of Bloom filter that can be generated from 
hash probe. When " +
+        "set to 0, no Bloom filter will be generated. To achieve optimal 
performance, this should" +
+        " not be too larger than the CPU cache size on the host.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(0)
+
+  val HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled")
+      .doc(
+        "Whether hash probe can generate any dynamic filter (including Bloom 
filter) and push" +
+          " down to upstream operators.")
+      .booleanConf
+      .createWithDefault(true)
+
   val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled")
       .doc(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
index 103bd00fbf..cf894b9da4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
@@ -99,6 +99,8 @@ class HashJoinMetricsUpdater(override val metrics: 
Map[String, SQLMetric])
   val hashProbeDynamicFiltersProduced: SQLMetric =
     metrics("hashProbeDynamicFiltersProduced")
 
+  val bloomFilterBlocksByteSize: SQLMetric = 
metrics("bloomFilterBlocksByteSize")
+
   val streamPreProjectionCpuCount: SQLMetric = 
metrics("streamPreProjectionCpuCount")
   val streamPreProjectionWallNanos: SQLMetric = 
metrics("streamPreProjectionWallNanos")
 
@@ -127,6 +129,7 @@ class HashJoinMetricsUpdater(override val metrics: 
Map[String, SQLMetric])
     hashProbeSpilledFiles += hashProbeMetrics.spilledFiles
     hashProbeReplacedWithDynamicFilterRows += 
hashProbeMetrics.numReplacedWithDynamicFilterRows
     hashProbeDynamicFiltersProduced += 
hashProbeMetrics.numDynamicFiltersProduced
+    bloomFilterBlocksByteSize += hashProbeMetrics.bloomFilterBlocksByteSize
     idx += 1
 
     // HashBuild
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index 0624fd1f2d..607de718ce 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -122,6 +122,7 @@ object MetricsUtil extends Logging {
     var numReplacedWithDynamicFilterRows: Long = 0
     var flushRowCount: Long = 0
     var loadedToValueHook: Long = 0
+    var bloomFilterBlocksByteSize: Long = 0
     var scanTime: Long = 0
     var skippedSplits: Long = 0
     var processedSplits: Long = 0
@@ -156,6 +157,7 @@ object MetricsUtil extends Logging {
       numReplacedWithDynamicFilterRows += 
metrics.numReplacedWithDynamicFilterRows
       flushRowCount += metrics.flushRowCount
       loadedToValueHook += metrics.loadedToValueHook
+      bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize
       scanTime += metrics.scanTime
       skippedSplits += metrics.skippedSplits
       processedSplits += metrics.processedSplits
@@ -197,6 +199,7 @@ object MetricsUtil extends Logging {
       numReplacedWithDynamicFilterRows,
       flushRowCount,
       loadedToValueHook,
+      bloomFilterBlocksByteSize,
       scanTime,
       skippedSplits,
       processedSplits,
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 6540b18e7a..8db6b95775 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -306,4 +306,40 @@ class VeloxHashJoinSuite extends 
VeloxWholeStageTransformerSuite {
       runQueryAndCompare(q5) { _ => }
     }
   }
+
+  test("Hash probe dynamic filter pushdown") {
+    withSQLConf(
+      VeloxConfig.HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED.key -> "true",
+      VeloxConfig.HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE.key -> "1048576"
+    ) {
+      withTable("probe_table", "build_table") {
+        spark.sql("""
+        CREATE TABLE probe_table USING PARQUET
+        AS SELECT id as a FROM range(110001)
+      """)
+
+        spark.sql("""
+        CREATE TABLE build_table USING PARQUET
+        AS SELECT id * 1000 as b FROM range(220002)
+      """)
+
+        runQueryAndCompare(
+          "SELECT a FROM probe_table JOIN build_table ON a = b"
+        ) {
+          df =>
+            val join = find(df.queryExecution.executedPlan) {
+              case _: BroadcastHashJoinExecTransformer => true
+              case _ => false
+            }
+            assert(join.isDefined)
+            val metrics = join.get.metrics
+            assert(metrics.contains("bloomFilterBlocksByteSize"))
+            assert(metrics("bloomFilterBlocksByteSize").value > 0)
+
+            assert(metrics.contains("hashProbeDynamicFiltersProduced"))
+            assert(metrics("hashProbeDynamicFiltersProduced").value == 1)
+        }
+      }
+    }
+  }
 }
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 187503f053..d384eebc08 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -273,7 +273,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
       env,
       metricsBuilderClass,
       "<init>",
-      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
+      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
 
   nativeColumnarToRowInfoClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -591,6 +591,7 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
       longArray[Metrics::kNumReplacedWithDynamicFilterRows],
       longArray[Metrics::kFlushRowCount],
       longArray[Metrics::kLoadedToValueHook],
+      longArray[Metrics::kBloomFilterBlocksByteSize],
       longArray[Metrics::kScanTime],
       longArray[Metrics::kSkippedSplits],
       longArray[Metrics::kProcessedSplits],
diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h
index c271a28a4f..8e33a4a9c6 100644
--- a/cpp/core/utils/Metrics.h
+++ b/cpp/core/utils/Metrics.h
@@ -69,6 +69,7 @@ struct Metrics {
     kNumReplacedWithDynamicFilterRows,
     kFlushRowCount,
     kLoadedToValueHook,
+    kBloomFilterBlocksByteSize,
     kScanTime,
     kSkippedSplits,
     kProcessedSplits,
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index ac5773e439..9849075e1f 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -45,6 +45,7 @@ const std::string kDynamicFiltersAccepted = 
"dynamicFiltersAccepted";
 const std::string kReplacedWithDynamicFilterRows = 
"replacedWithDynamicFilterRows";
 const std::string kFlushRowCount = "flushRowCount";
 const std::string kLoadedToValueHook = "loadedToValueHook";
+const std::string kBloomFilterBlocksByteSize = "bloomFilterSize";
 const std::string kTotalScanTime = "totalScanTime";
 const std::string kSkippedSplits = "skippedSplits";
 const std::string kProcessedSplits = "processedSplits";
@@ -487,6 +488,8 @@ void WholeStageResultIterator::collectMetrics() {
       metrics_->get(Metrics::kFlushRowCount)[metricIndex] = 
runtimeMetric("sum", second->customStats, kFlushRowCount);
       metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] =
           runtimeMetric("sum", second->customStats, kLoadedToValueHook);
+      metrics_->get(Metrics::kBloomFilterBlocksByteSize)[metricIndex] =
+          runtimeMetric("sum", second->customStats, 
kBloomFilterBlocksByteSize);
       metrics_->get(Metrics::kScanTime)[metricIndex] = runtimeMetric("sum", 
second->customStats, kTotalScanTime);
       metrics_->get(Metrics::kSkippedSplits)[metricIndex] = 
runtimeMetric("sum", second->customStats, kSkippedSplits);
       metrics_->get(Metrics::kProcessedSplits)[metricIndex] =
@@ -637,6 +640,11 @@ std::unordered_map<std::string, std::string> 
WholeStageResultIterator::getQueryC
         std::to_string(veloxCfg_->get<int64_t>(kBloomFilterNumBits, 8388608));
     configs[velox::core::QueryConfig::kSparkBloomFilterMaxNumBits] =
         std::to_string(veloxCfg_->get<int64_t>(kBloomFilterMaxNumBits, 
4194304));
+
+    configs[velox::core::QueryConfig::kHashProbeDynamicFilterPushdownEnabled] =
+        
std::to_string(veloxCfg_->get<bool>(kHashProbeDynamicFilterPushdownEnabled, 
true));
+    configs[velox::core::QueryConfig::kHashProbeBloomFilterPushdownMaxSize] =
+        
std::to_string(veloxCfg_->get<uint64_t>(kHashProbeBloomFilterPushdownMaxSize, 
0));
     // spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver takes no 
effect if
     // spark.gluten.sql.columnar.backend.velox.IOThreads is set to 0
     configs[velox::core::QueryConfig::kMaxSplitPreloadPerDriver] =
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 2cacee5369..566ce875aa 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -72,6 +72,12 @@ const std::string kBloomFilterNumBits = 
"spark.gluten.sql.columnar.backend.velox
 const std::string kBloomFilterMaxNumBits = 
"spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits";
 const std::string kVeloxSplitPreloadPerDriver = 
"spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver";
 
+const std::string kHashProbeDynamicFilterPushdownEnabled =
+    
"spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled";
+
+const std::string kHashProbeBloomFilterPushdownMaxSize =
+    
"spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize";
+
 const std::string kShowTaskMetricsWhenFinished = 
"spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished";
 const bool kShowTaskMetricsWhenFinishedDefault = false;
 
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index bd838f357c..f4a79c4652 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -33,6 +33,8 @@ nav_order: 16
 | spark.gluten.sql.columnar.backend.velox.floatingPointMode                    
    | loose             | Config used to control the tolerance of floating 
point operations alignment with Spark. When the mode is set to strict, flushing 
is disabled for sum(float/double)and avg(float/double). When set to loose, 
flushing will be enabled.                                                       
                                                                                
                        [...]
 | spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation          
    | true              | Enable flushable aggregation. If true, Gluten will 
try converting regular aggregation into Velox's flushable aggregation when 
applicable. A flushable aggregation could emit intermediate result at anytime 
when memory is full / data reduction ratio is low.                              
                                                                                
                        [...]
 | spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                  
    | 32KB              | Set the footer estimated size for velox file scan, 
refer to Velox's footer-estimated-size                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| 
spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize   
 | 0b                | The maximum byte size of Bloom filter that can be 
generated from hash probe. When set to 0, no Bloom filter will be generated. To 
achieve optimal performance, this should not be too larger than the CPU cache 
size on the host.                                                               
                                                                                
                    [...]
+| 
spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled 
 | true              | Whether hash probe can generate any dynamic filter 
(including Bloom filter) and push down to upstream operators.                   
                                                                                
                                                                                
                                                                                
                 [...]
 | spark.gluten.sql.columnar.backend.velox.loadQuantum                          
    | 256MB             | Set the load quantum for velox file scan, recommend 
to use the default value (256MB) for performance consideration. If Velox cache 
is enabled, it can be 8MB at most.                                              
                                                                                
                                                                                
                 [...]
 | spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes                    
    | 64MB              | Set the max coalesced bytes for velox file scan       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance                 
    | 512KB             | Set the max coalesced distance bytes for velox file 
scan                                                                            
                                                                                
                                                                                
                                                                                
                [...]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to