This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ec25cda135 [GLUTEN-11605][VL] Push dynamic filters down to ValueStream 
(#11657)
ec25cda135 is described below

commit ec25cda13586a6047f2d08dbe67ea9498e4edc6c
Author: Ankita Victor <[email protected]>
AuthorDate: Fri Mar 13 15:08:09 2026 +0530

    [GLUTEN-11605][VL] Push dynamic filters down to ValueStream (#11657)
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |   3 +-
 .../java/org/apache/gluten/metrics/Metrics.java    |   4 +
 .../org/apache/gluten/metrics/OperatorMetrics.java |   3 +
 .../backendsapi/velox/VeloxIteratorApi.scala       |   9 +-
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala |  15 +-
 .../org/apache/gluten/config/VeloxConfig.scala     |  11 +
 .../metrics/InputIteratorMetricsUpdater.scala      |   6 +
 .../org/apache/gluten/metrics/MetricsUtil.scala    |   3 +
 cpp/core/jni/JniWrapper.cc                         |   3 +-
 cpp/core/utils/Metrics.h                           |   1 +
 cpp/velox/compute/VeloxBackend.cc                  |   5 +-
 cpp/velox/compute/WholeStageResultIterator.cc      |   3 +
 cpp/velox/config/VeloxConfig.h                     |   4 +
 cpp/velox/operators/plannodes/RowVectorStream.cc   | 114 +++++++-
 cpp/velox/operators/plannodes/RowVectorStream.h    |  60 +++-
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        |   4 +-
 cpp/velox/tests/CMakeLists.txt                     |   3 +-
 cpp/velox/tests/ValueStreamDynamicFilterTest.cc    | 318 +++++++++++++++++++++
 docs/velox-configuration.md                        |   1 +
 .../apache/gluten/backendsapi/IteratorApi.scala    |   3 +-
 .../gluten/execution/WholeStageTransformer.scala   |  32 ++-
 .../execution/WholeStageZippedPartitionsRDD.scala  |   3 +-
 22 files changed, 589 insertions(+), 19 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 16dc2bccba..a705675e2d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -319,7 +319,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
       materializeInput: Boolean,
-      enableCudf: Boolean): Iterator[ColumnarBatch] = {
+      enableCudf: Boolean,
+      supportsValueStreamDynamicFilter: Boolean): Iterator[ColumnarBatch] = {
     // scalastyle:on argcount
 
     // Final iterator does not contain scan split, so pass empty split info to 
native here.
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
index 3d23dc94db..9b7cd7c8d7 100644
--- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -40,6 +40,7 @@ public class Metrics implements IMetrics {
   public long[] numDynamicFiltersProduced;
   public long[] numDynamicFiltersAccepted;
   public long[] numReplacedWithDynamicFilterRows;
+  public long[] numDynamicFilterInputRows;
   public long[] flushRowCount;
   public long[] loadedToValueHook;
   public long[] bloomFilterBlocksByteSize;
@@ -90,6 +91,7 @@ public class Metrics implements IMetrics {
       long[] numDynamicFiltersProduced,
       long[] numDynamicFiltersAccepted,
       long[] numReplacedWithDynamicFilterRows,
+      long[] numDynamicFilterInputRows,
       long[] flushRowCount,
       long[] loadedToValueHook,
       long[] bloomFilterBlocksByteSize,
@@ -134,6 +136,7 @@ public class Metrics implements IMetrics {
     this.numDynamicFiltersProduced = numDynamicFiltersProduced;
     this.numDynamicFiltersAccepted = numDynamicFiltersAccepted;
     this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
+    this.numDynamicFilterInputRows = numDynamicFilterInputRows;
     this.flushRowCount = flushRowCount;
     this.loadedToValueHook = loadedToValueHook;
     this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
@@ -184,6 +187,7 @@ public class Metrics implements IMetrics {
         numDynamicFiltersProduced[index],
         numDynamicFiltersAccepted[index],
         numReplacedWithDynamicFilterRows[index],
+        numDynamicFilterInputRows[index],
         flushRowCount[index],
         loadedToValueHook[index],
         bloomFilterBlocksByteSize[index],
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index 10563e507e..0726fd1b76 100644
--- 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++ 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -38,6 +38,7 @@ public class OperatorMetrics implements IOperatorMetrics {
   public long numDynamicFiltersProduced;
   public long numDynamicFiltersAccepted;
   public long numReplacedWithDynamicFilterRows;
+  public long numDynamicFilterInputRows;
   public long flushRowCount;
   public long loadedToValueHook;
   public long bloomFilterBlocksByteSize;
@@ -83,6 +84,7 @@ public class OperatorMetrics implements IOperatorMetrics {
       long numDynamicFiltersProduced,
       long numDynamicFiltersAccepted,
       long numReplacedWithDynamicFilterRows,
+      long numDynamicFilterInputRows,
       long flushRowCount,
       long loadedToValueHook,
       long bloomFilterBlocksByteSize,
@@ -125,6 +127,7 @@ public class OperatorMetrics implements IOperatorMetrics {
     this.numDynamicFiltersProduced = numDynamicFiltersProduced;
     this.numDynamicFiltersAccepted = numDynamicFiltersAccepted;
     this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
+    this.numDynamicFilterInputRows = numDynamicFilterInputRows;
     this.flushRowCount = flushRowCount;
     this.loadedToValueHook = loadedToValueHook;
     this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 1088440774..60018554dd 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -247,8 +247,13 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
       materializeInput: Boolean,
-      enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
-    val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> 
enableCudf.toString).asJava
+      enableCudf: Boolean = false,
+      supportsValueStreamDynamicFilter: Boolean = true): 
Iterator[ColumnarBatch] = {
+    val extraConfMap = mutable.Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> 
enableCudf.toString)
+    if (!supportsValueStreamDynamicFilter) {
+      extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) = 
"false"
+    }
+    val extraConf = extraConfMap.asJava
     val transKernel = 
NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)
     val columnarNativeIterator =
       inputIterators.map {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 29d882c3d3..f13217442f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -73,10 +73,23 @@ class VeloxMetricsApi extends MetricsApi with Logging {
       SQLMetrics.createNanoTimingMetric(sparkContext, "time of operator input")
     }
 
+    val dynamicFilterMetrics = if (forShuffle) {
+      Map(
+        "valueStreamDynamicFiltersAccepted" -> SQLMetrics.createMetric(
+          sparkContext,
+          "number of dynamic filters accepted by value stream"),
+        "valueStreamDynamicFilterInputRows" -> SQLMetrics.createMetric(
+          sparkContext,
+          "number of input rows")
+      )
+    } else {
+      Map.empty[String, SQLMetric]
+    }
+
     Map(
       "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time 
count"),
       "wallNanos" -> wallNanosMetric
-    ) ++ outputMetrics
+    ) ++ outputMetrics ++ dynamicFilterMetrics
   }
 
   override def genInputIteratorTransformerMetricsUpdater(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 46dcb55fe9..993c554808 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -94,6 +94,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
 
   def hashProbeDynamicFilterPushdownEnabled: Boolean =
     getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED)
+
+  def valueStreamDynamicFilterEnabled: Boolean =
+    getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED)
 }
 
 object VeloxConfig extends ConfigRegistry {
@@ -455,6 +458,14 @@ object VeloxConfig extends ConfigRegistry {
       .booleanConf
       .createWithDefault(true)
 
+  val VALUE_STREAM_DYNAMIC_FILTER_ENABLED =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled")
+      .doc(
+        "Whether to apply dynamic filters pushed down from hash probe in the 
ValueStream" +
+          " (shuffle reader) operator to filter rows before they reach the 
hash join.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled")
       .doc(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala
index 79f6432645..ed006a5434 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala
@@ -35,6 +35,12 @@ case class InputIteratorMetricsUpdater(metrics: Map[String, 
SQLMetric], forBroad
           metrics("numOutputRows") += operatorMetrics.outputRows
           metrics("outputVectors") += operatorMetrics.outputVectors
         }
+        metrics.get("valueStreamDynamicFiltersAccepted").foreach {
+          _ += operatorMetrics.numDynamicFiltersAccepted
+        }
+        metrics.get("valueStreamDynamicFilterInputRows").foreach {
+          _ += operatorMetrics.numDynamicFilterInputRows
+        }
       }
     }
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index 607de718ce..09430fdd70 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -120,6 +120,7 @@ object MetricsUtil extends Logging {
     var numDynamicFiltersProduced: Long = 0
     var numDynamicFiltersAccepted: Long = 0
     var numReplacedWithDynamicFilterRows: Long = 0
+    var numDynamicFilterInputRows: Long = 0
     var flushRowCount: Long = 0
     var loadedToValueHook: Long = 0
     var bloomFilterBlocksByteSize: Long = 0
@@ -155,6 +156,7 @@ object MetricsUtil extends Logging {
       numDynamicFiltersProduced += metrics.numDynamicFiltersProduced
       numDynamicFiltersAccepted += metrics.numDynamicFiltersAccepted
       numReplacedWithDynamicFilterRows += 
metrics.numReplacedWithDynamicFilterRows
+      numDynamicFilterInputRows += metrics.numDynamicFilterInputRows
       flushRowCount += metrics.flushRowCount
       loadedToValueHook += metrics.loadedToValueHook
       bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize
@@ -197,6 +199,7 @@ object MetricsUtil extends Logging {
       numDynamicFiltersProduced,
       numDynamicFiltersAccepted,
       numReplacedWithDynamicFilterRows,
+      numDynamicFilterInputRows,
       flushRowCount,
       loadedToValueHook,
       bloomFilterBlocksByteSize,
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index c48886195b..c8cd3adef4 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -273,7 +273,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
       env,
       metricsBuilderClass,
       "<init>",
-      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
+      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
 
   nativeColumnarToRowInfoClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -589,6 +589,7 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
       longArray[Metrics::kNumDynamicFiltersProduced],
       longArray[Metrics::kNumDynamicFiltersAccepted],
       longArray[Metrics::kNumReplacedWithDynamicFilterRows],
+      longArray[Metrics::kNumDynamicFilterInputRows],
       longArray[Metrics::kFlushRowCount],
       longArray[Metrics::kLoadedToValueHook],
       longArray[Metrics::kBloomFilterBlocksByteSize],
diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h
index 8e33a4a9c6..6123f4a7ac 100644
--- a/cpp/core/utils/Metrics.h
+++ b/cpp/core/utils/Metrics.h
@@ -67,6 +67,7 @@ struct Metrics {
     kNumDynamicFiltersProduced,
     kNumDynamicFiltersAccepted,
     kNumReplacedWithDynamicFilterRows,
+    kNumDynamicFilterInputRows,
     kFlushRowCount,
     kLoadedToValueHook,
     kBloomFilterBlocksByteSize,
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index de9e9385f8..742d0cba70 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -319,7 +319,10 @@ void VeloxBackend::initConnector(const 
std::shared_ptr<velox::config::ConfigBase
       
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, 
hiveConf, ioExecutor_.get()));
   
   // Register value-stream connector for runtime iterator-based inputs
-  
velox::connector::registerConnector(std::make_shared<ValueStreamConnector>(kIteratorConnectorId,
 hiveConf));
+  auto valueStreamDynamicFilterEnabled =
+      backendConf_->get<bool>(kValueStreamDynamicFilterEnabled, 
kValueStreamDynamicFilterEnabledDefault);
+  velox::connector::registerConnector(
+      std::make_shared<ValueStreamConnector>(kIteratorConnectorId, hiveConf, 
valueStreamDynamicFilterEnabled));
   
 #ifdef GLUTEN_ENABLE_GPU
   if (backendConf_->get<bool>(kCudfEnableTableScan, 
kCudfEnableTableScanDefault) &&
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 9607147045..c92156a557 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -43,6 +43,7 @@ namespace {
 const std::string kDynamicFiltersProduced = "dynamicFiltersProduced";
 const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted";
 const std::string kReplacedWithDynamicFilterRows = 
"replacedWithDynamicFilterRows";
+const std::string kDynamicFilterInputRows = "dynamicFilterInputRows";
 const std::string kFlushRowCount = "flushRowCount";
 const std::string kLoadedToValueHook = "loadedToValueHook";
 const std::string kBloomFilterBlocksByteSize = "bloomFilterSize";
@@ -490,6 +491,8 @@ void WholeStageResultIterator::collectMetrics() {
           runtimeMetric("sum", second->customStats, kDynamicFiltersAccepted);
       metrics_->get(Metrics::kNumReplacedWithDynamicFilterRows)[metricIndex] =
           runtimeMetric("sum", second->customStats, 
kReplacedWithDynamicFilterRows);
+      metrics_->get(Metrics::kNumDynamicFilterInputRows)[metricIndex] =
+          runtimeMetric("sum", second->customStats, kDynamicFilterInputRows);
       metrics_->get(Metrics::kFlushRowCount)[metricIndex] = 
runtimeMetric("sum", second->customStats, kFlushRowCount);
       metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] =
           runtimeMetric("sum", second->customStats, kLoadedToValueHook);
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 3c5c432b8c..6195620fc8 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -82,6 +82,10 @@ const std::string kHashProbeDynamicFilterPushdownEnabled =
 const std::string kHashProbeBloomFilterPushdownMaxSize =
     
"spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize";
 
+const std::string kValueStreamDynamicFilterEnabled =
+    
"spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled";
+const bool kValueStreamDynamicFilterEnabledDefault = false;
+
 const std::string kShowTaskMetricsWhenFinished = 
"spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished";
 const bool kShowTaskMetricsWhenFinishedDefault = false;
 
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc 
b/cpp/velox/operators/plannodes/RowVectorStream.cc
index 7c0b00979a..514e6e6735 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.cc
+++ b/cpp/velox/operators/plannodes/RowVectorStream.cc
@@ -19,6 +19,7 @@
 #include "memory/VeloxColumnarBatch.h"
 #include "velox/exec/Driver.h"
 #include "velox/exec/Operator.h"
+#include "velox/exec/OperatorUtils.h"
 #include "velox/exec/Task.h"
 #include "velox/vector/arrow/Bridge.h"
 
@@ -113,7 +114,9 @@ ValueStreamDataSource::ValueStreamDataSource(
     const facebook::velox::connector::ColumnHandleMap& columnHandles,
     facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx)
     : outputType_(outputType),
-      pool_(connectorQueryCtx->memoryPool()) {}
+      pool_(connectorQueryCtx->memoryPool()),
+      dynamicFilterEnabled_(
+          std::dynamic_pointer_cast<const 
ValueStreamTableHandle>(tableHandle)->dynamicFilterEnabled()) {}
 
 void 
ValueStreamDataSource::addSplit(std::shared_ptr<facebook::velox::connector::ConnectorSplit>
 split) {
   // Cast to IteratorConnectorSplit to extract the iterator
@@ -166,7 +169,116 @@ std::optional<facebook::velox::RowVectorPtr> 
ValueStreamDataSource::next(
   completedRows_ += rowVector->size();
   completedBytes_ += rowVector->estimateFlatSize();
 
+  // Apply dynamic filters if any have been pushed down.
+  if (!dynamicFilters_.empty()) {
+    rowVector = applyDynamicFilters(rowVector);
+    if (!rowVector) {
+      // All rows filtered out, try next batch.
+      return next(size, future);
+    }
+  }
+
   return rowVector;
 }
 
+facebook::velox::RowVectorPtr ValueStreamDataSource::applyDynamicFilters(
+    const facebook::velox::RowVectorPtr& input) {
+  using namespace facebook::velox;
+
+  const auto numRows = input->size();
+  if (numRows == 0) {
+    return input;
+  }
+
+  SelectivityVector rows(numRows, true);
+
+  for (const auto& [channel, filter] : dynamicFilters_) {
+    if (!filter || channel >= input->childrenSize()) {
+      continue;
+    }
+    applyFilterOnColumn(filter, input->childAt(channel), rows);
+    if (!rows.hasSelections()) {
+      dynamicFilterInputRows_ += numRows;
+      return nullptr;
+    }
+  }
+
+  const auto passedCount = rows.countSelected();
+  if (passedCount == numRows) {
+    return input;
+  }
+
+  dynamicFilterInputRows_ += numRows;
+
+  BufferPtr indices = allocateIndices(passedCount, pool_);
+  auto* rawIndices = indices->asMutable<vector_size_t>();
+  vector_size_t idx = 0;
+  rows.applyToSelected([&](auto row) { rawIndices[idx++] = row; });
+
+  return exec::wrap(passedCount, std::move(indices), input);
+}
+
+void ValueStreamDataSource::applyFilterOnColumn(
+    const std::shared_ptr<facebook::velox::common::Filter>& filter,
+    const facebook::velox::VectorPtr& vector,
+    facebook::velox::SelectivityVector& rows) {
+  using namespace facebook::velox;
+
+  DecodedVector decoded(*vector, rows);
+
+  rows.applyToSelected([&](auto row) {
+    if (decoded.isNullAt(row)) {
+      if (!filter->testNull()) {
+        rows.setValid(row, false);
+      }
+      return;
+    }
+
+    bool pass = false;
+    switch (vector->typeKind()) {
+      case TypeKind::BOOLEAN:
+        pass = filter->testBool(decoded.valueAt<bool>(row));
+        break;
+      case TypeKind::TINYINT:
+        pass = filter->testInt64(decoded.valueAt<int8_t>(row));
+        break;
+      case TypeKind::SMALLINT:
+        pass = filter->testInt64(decoded.valueAt<int16_t>(row));
+        break;
+      case TypeKind::INTEGER:
+        pass = filter->testInt64(decoded.valueAt<int32_t>(row));
+        break;
+      case TypeKind::BIGINT:
+        pass = filter->testInt64(decoded.valueAt<int64_t>(row));
+        break;
+      case TypeKind::HUGEINT:
+        pass = filter->testInt128(decoded.valueAt<int128_t>(row));
+        break;
+      case TypeKind::REAL:
+        pass = filter->testFloat(decoded.valueAt<float>(row));
+        break;
+      case TypeKind::DOUBLE:
+        pass = filter->testDouble(decoded.valueAt<double>(row));
+        break;
+      case TypeKind::VARCHAR:
+      case TypeKind::VARBINARY: {
+        auto sv = decoded.valueAt<StringView>(row);
+        pass = filter->testBytes(sv.data(), sv.size());
+        break;
+      }
+      case TypeKind::TIMESTAMP:
+        pass = filter->testTimestamp(decoded.valueAt<Timestamp>(row));
+        break;
+      default:
+        // For unsupported types, let the row pass through.
+        pass = true;
+        break;
+    }
+    if (!pass) {
+      rows.setValid(row, false);
+    }
+  });
+  rows.updateBounds();
+}
+
 } // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h 
b/cpp/velox/operators/plannodes/RowVectorStream.h
index 6e6ccd1527..c6dc81a757 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -23,7 +23,10 @@
 #include "velox/connectors/Connector.h"
 #include "velox/exec/Driver.h"
 #include "velox/exec/Operator.h"
+#include "velox/exec/OperatorUtils.h"
 #include "velox/exec/Task.h"
+#include "velox/type/Filter.h"
+#include "velox/vector/DecodedVector.h"
 
 namespace gluten {
 
@@ -68,10 +71,17 @@ class ValueStreamDataSource : public 
facebook::velox::connector::DataSource {
 
   std::optional<facebook::velox::RowVectorPtr> next(uint64_t size, 
facebook::velox::ContinueFuture& future) override;
 
+  const facebook::velox::common::SubfieldFilters* getFilters() const override {
+    return &emptyFilters_;
+  }
+
   void addDynamicFilter(
       facebook::velox::column_index_t outputChannel,
       const std::shared_ptr<facebook::velox::common::Filter>& filter) override 
{
-    // Iterator-based sources don't support dynamic filtering
+    if (dynamicFilterEnabled_) {
+      dynamicFilters_[outputChannel] = filter;
+      numDynamicFiltersAccepted_++;
+    }
   }
 
   uint64_t getCompletedBytes() override {
@@ -83,10 +93,26 @@ class ValueStreamDataSource : public 
facebook::velox::connector::DataSource {
   }
 
   std::unordered_map<std::string, facebook::velox::RuntimeMetric> 
getRuntimeStats() override {
-    return {};
+    std::unordered_map<std::string, facebook::velox::RuntimeMetric> stats;
+    stats["dynamicFiltersAccepted"] = 
facebook::velox::RuntimeMetric(numDynamicFiltersAccepted_);
+    if (dynamicFilterInputRows_ > 0) {
+      stats["dynamicFilterInputRows"] = 
facebook::velox::RuntimeMetric(dynamicFilterInputRows_);
+    }
+    return stats;
   }
 
  private:
+  // Applies dynamic filters to a batch, returning a dictionary-wrapped subset
+  // containing only the rows that pass all filters.
+  facebook::velox::RowVectorPtr applyDynamicFilters(const 
facebook::velox::RowVectorPtr& input);
+
+  // Evaluates a Filter against a single column vector, deselecting rows that
+  // don't pass.
+  static void applyFilterOnColumn(
+      const std::shared_ptr<facebook::velox::common::Filter>& filter,
+      const facebook::velox::VectorPtr& vector,
+      facebook::velox::SelectivityVector& rows);
+
   const facebook::velox::RowTypePtr outputType_;
   facebook::velox::memory::MemoryPool* pool_;
 
@@ -94,21 +120,35 @@ class ValueStreamDataSource : public 
facebook::velox::connector::DataSource {
   std::shared_ptr<RowVectorStream> currentIterator_{nullptr};
   uint64_t completedBytes_{0};
   uint64_t completedRows_{0};
+
+  folly::F14FastMap<facebook::velox::column_index_t, 
std::shared_ptr<facebook::velox::common::Filter>> dynamicFilters_;
+  const facebook::velox::common::SubfieldFilters emptyFilters_;
+  bool dynamicFilterEnabled_{true};
+  uint64_t numDynamicFiltersAccepted_{0};
+  uint64_t dynamicFilterInputRows_{0};
 };
 
 /// Table handle for iterator-based scans
 class ValueStreamTableHandle : public 
facebook::velox::connector::ConnectorTableHandle {
  public:
-  explicit ValueStreamTableHandle(std::string connectorId) : 
ConnectorTableHandle(connectorId) {}
+  explicit ValueStreamTableHandle(std::string connectorId, bool 
dynamicFilterEnabled = true)
+      : ConnectorTableHandle(connectorId), 
dynamicFilterEnabled_(dynamicFilterEnabled) {}
 
   const std::string& name() const override {
     static const std::string kName = "ValueStreamTableHandle";
     return kName;
   }
 
+  bool dynamicFilterEnabled() const {
+    return dynamicFilterEnabled_;
+  }
+
   folly::dynamic serialize() const override {
     VELOX_NYI();
   }
+
+ private:
+  bool dynamicFilterEnabled_;
 };
 
 /// Column handle for iterator-based scans
@@ -133,10 +173,15 @@ class ValueStreamColumnHandle : public 
facebook::velox::connector::ColumnHandle
 /// Connector implementation for iterator-based data sources
 class ValueStreamConnector : public facebook::velox::connector::Connector {
  public:
-  explicit ValueStreamConnector(
+  ValueStreamConnector(
       const std::string& id,
-      std::shared_ptr<const facebook::velox::config::ConfigBase> config)
-      : Connector(id, config) {}
+      std::shared_ptr<const facebook::velox::config::ConfigBase> config,
+      bool dynamicFilterEnabled = false)
+      : Connector(id, config), dynamicFilterEnabled_(dynamicFilterEnabled) {}
+
+  bool canAddDynamicFilter() const override {
+    return dynamicFilterEnabled_;
+  }
 
   std::unique_ptr<facebook::velox::connector::DataSource> createDataSource(
       const facebook::velox::RowTypePtr& outputType,
@@ -153,6 +198,9 @@ class ValueStreamConnector : public 
facebook::velox::connector::Connector {
       facebook::velox::connector::CommitStrategy commitStrategy) override {
     VELOX_UNSUPPORTED("ValueStreamConnector does not support data sinks");
   }
+
+ private:
+  bool dynamicFilterEnabled_;
 };
 
 /// Factory for creating ValueStreamConnector instances
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 834127e20c..adb7fc5f45 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1353,7 +1353,9 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructValueStreamNode(
   auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
 
   // Create TableHandle
-  auto tableHandle = 
std::make_shared<ValueStreamTableHandle>(kIteratorConnectorId);
+  bool dynamicFilterEnabled =
+      veloxCfg_->get<bool>(kValueStreamDynamicFilterEnabled, 
kValueStreamDynamicFilterEnabledDefault);
+  auto tableHandle = 
std::make_shared<ValueStreamTableHandle>(kIteratorConnectorId, 
dynamicFilterEnabled);
 
   // Create column assignments
   connector::ColumnHandleMap assignments;
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index a690347bc4..0c61850e12 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -115,7 +115,8 @@ add_velox_test(
   VeloxRowToColumnarTest.cc
   VeloxColumnarBatchSerializerTest.cc
   VeloxColumnarBatchTest.cc
-  VeloxBatchResizerTest.cc)
+  VeloxBatchResizerTest.cc
+  ValueStreamDynamicFilterTest.cc)
 add_velox_test(
   velox_plan_conversion_test
   SOURCES
diff --git a/cpp/velox/tests/ValueStreamDynamicFilterTest.cc 
b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc
new file mode 100644
index 0000000000..6e13bfed01
--- /dev/null
+++ b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "memory/VeloxColumnarBatch.h"
+#include "operators/plannodes/RowVectorStream.h"
+#include "velox/type/Filter.h"
+#include "velox/vector/DecodedVector.h"
+#include "velox/vector/FlatVector.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+using namespace facebook::velox;
+using namespace facebook::velox::exec;
+using namespace facebook::velox::common;
+
+namespace facebook::velox::test {
+
+/// A ColumnarBatchIterator that yields pre-built RowVectors as 
VeloxColumnarBatches.
+class TestBatchIterator final : public gluten::ColumnarBatchIterator {
+ public:
+  explicit TestBatchIterator(std::vector<RowVectorPtr> batches) : 
batches_(std::move(batches)) {}
+
+  std::shared_ptr<gluten::ColumnarBatch> next() override {
+    if (idx_ >= batches_.size()) {
+      return nullptr;
+    }
+    return std::make_shared<gluten::VeloxColumnarBatch>(batches_[idx_++]);
+  }
+
+ private:
+  std::vector<RowVectorPtr> batches_;
+  size_t idx_ = 0;
+};
+
+class ValueStreamDynamicFilterTest : public ::testing::Test, public 
VectorTestBase {
+ protected:
+  static void SetUpTestCase() {
+    
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
+  }
+
+  void SetUp() override {
+    // Register the connector if not already registered.
+    if (!connector::hasConnector(gluten::kIteratorConnectorId)) {
+      auto config = std::make_shared<config::ConfigBase>(
+          std::unordered_map<std::string, std::string>());
+      
connector::registerConnector(std::make_shared<gluten::ValueStreamConnector>(
+          gluten::kIteratorConnectorId, config, 
/*dynamicFilterEnabled=*/true));
+    }
+  }
+
+  /// Build a TableScanNode that reads from the value-stream connector.
+  std::shared_ptr<core::TableScanNode> makeTableScanNode(
+      const std::string& nodeId,
+      const RowTypePtr& outputType) {
+    auto tableHandle =
+        
std::make_shared<gluten::ValueStreamTableHandle>(gluten::kIteratorConnectorId);
+
+    connector::ColumnHandleMap assignments;
+    for (int idx = 0; idx < outputType->size(); idx++) {
+      auto name = outputType->nameOf(idx);
+      auto type = outputType->childAt(idx);
+      assignments[name] =
+          std::make_shared<gluten::ValueStreamColumnHandle>(name, type);
+    }
+
+    return std::make_shared<core::TableScanNode>(
+        nodeId, outputType, tableHandle, assignments);
+  }
+
+  /// Create a split wrapping the given batches.
+  std::shared_ptr<connector::ConnectorSplit> makeSplit(
+      std::vector<RowVectorPtr> batches) {
+    auto iter = std::make_shared<gluten::ResultIterator>(
+        std::make_unique<TestBatchIterator>(std::move(batches)));
+    return std::make_shared<gluten::IteratorConnectorSplit>(
+        gluten::kIteratorConnectorId, std::move(iter));
+  }
+
+  /// Read all int64 values from column 0 of a serial-mode task.
+  std::vector<int64_t> readAllInt64(Task* task) {
+    std::vector<int64_t> result;
+    ContinueFuture future = ContinueFuture::makeEmpty();
+    while (true) {
+      auto batch = task->next(&future);
+      if (!batch) {
+        break;
+      }
+      DecodedVector decoded(*batch->childAt(0));
+      for (vector_size_t i = 0; i < batch->size(); i++) {
+        result.push_back(decoded.valueAt<int64_t>(i));
+      }
+    }
+    return result;
+  }
+};
+
+// Test that without any filter, all rows pass through.
+TEST_F(ValueStreamDynamicFilterTest, noFilterPassesAllRows) {
+  auto batch = makeRowVector({"id"}, {makeFlatVector<int64_t>({10, 20, 30})});
+  auto outputType = asRowType(batch->type());
+  auto scanNode = makeTableScanNode("vs0", outputType);
+
+  auto queryCtx = core::QueryCtx::create();
+  auto task = Task::create(
+      "test-nofilter",
+      core::PlanFragment{scanNode},
+      0,
+      queryCtx,
+      Task::ExecutionMode::kSerial);
+
+  task->addSplit(scanNode->id(), Split{makeSplit({batch})});
+  task->noMoreSplits(scanNode->id());
+
+  auto ids = readAllInt64(task.get());
+  ASSERT_EQ(ids, (std::vector<int64_t>{10, 20, 30}));
+}
+
+// Test that filtering works when filter is injected after first batch.
+TEST_F(ValueStreamDynamicFilterTest, filterBigintRange) {
+  auto batch1 = makeRowVector({"id"}, {makeFlatVector<int64_t>({1, 2, 3, 4, 
5})});
+  auto batch2 = makeRowVector({"id"}, {makeFlatVector<int64_t>({6, 7, 8, 9, 
10})});
+  auto outputType = asRowType(batch1->type());
+  auto scanNode = makeTableScanNode("vs1", outputType);
+
+  auto queryCtx = core::QueryCtx::create();
+  auto task = Task::create(
+      "test-bigint",
+      core::PlanFragment{scanNode},
+      0,
+      queryCtx,
+      Task::ExecutionMode::kSerial);
+
+  // Add both batches as a single split.
+  task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})});
+  task->noMoreSplits(scanNode->id());
+
+  // First next() creates drivers and returns first batch (unfiltered).
+  ContinueFuture future = ContinueFuture::makeEmpty();
+  auto firstBatch = task->next(&future);
+  ASSERT_NE(firstBatch, nullptr);
+  ASSERT_EQ(firstBatch->size(), 5);
+
+  // Inject a BigintRange filter: keep only id >= 8.
+  task->testingVisitDrivers([&](Driver* driver) {
+    auto* op = driver->findOperator(scanNode->id());
+    if (!op) {
+      return;
+    }
+    ASSERT_TRUE(op->canAddDynamicFilter());
+    PushdownFilters pf;
+    pf.filters[0] = std::make_shared<BigintRange>(8, 10, false);
+    pf.dynamicFilteredColumns.insert(0);
+    op->addDynamicFilterLocked("producer", pf);
+  });
+
+  // Second next() should return filtered batch.
+  auto secondBatch = task->next(&future);
+  ASSERT_NE(secondBatch, nullptr);
+
+  DecodedVector decoded(*secondBatch->childAt(0));
+  std::vector<int64_t> outputIds;
+  for (vector_size_t i = 0; i < secondBatch->size(); i++) {
+    outputIds.push_back(decoded.valueAt<int64_t>(i));
+  }
+  ASSERT_EQ(outputIds, (std::vector<int64_t>{8, 9, 10}));
+
+  auto end = task->next(&future);
+  ASSERT_EQ(end, nullptr);
+}
+
+// Test that a filter eliminates all rows from a batch.
+TEST_F(ValueStreamDynamicFilterTest, filterEliminatesEntireBatch) {
+  auto batch1 = makeRowVector({"id"}, {makeFlatVector<int64_t>({1, 2, 3})});
+  auto batch2 = makeRowVector({"id"}, {makeFlatVector<int64_t>({100, 200, 
300})});
+  auto outputType = asRowType(batch1->type());
+  auto scanNode = makeTableScanNode("vs2", outputType);
+
+  auto queryCtx = core::QueryCtx::create();
+  auto task = Task::create(
+      "test-eliminate",
+      core::PlanFragment{scanNode},
+      0,
+      queryCtx,
+      Task::ExecutionMode::kSerial);
+
+  task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})});
+  task->noMoreSplits(scanNode->id());
+
+  ContinueFuture future = ContinueFuture::makeEmpty();
+  auto firstBatch = task->next(&future);
+  ASSERT_NE(firstBatch, nullptr);
+  ASSERT_EQ(firstBatch->size(), 3);
+
+  task->testingVisitDrivers([&](Driver* driver) {
+    auto* op = driver->findOperator(scanNode->id());
+    if (!op) {
+      return;
+    }
+    PushdownFilters pf;
+    pf.filters[0] = std::make_shared<BigintRange>(100, 300, false);
+    pf.dynamicFilteredColumns.insert(0);
+    op->addDynamicFilterLocked("producer", pf);
+  });
+
+  auto secondBatch = task->next(&future);
+  ASSERT_NE(secondBatch, nullptr);
+
+  DecodedVector decoded(*secondBatch->childAt(0));
+  std::vector<int64_t> outputIds;
+  for (vector_size_t i = 0; i < secondBatch->size(); i++) {
+    outputIds.push_back(decoded.valueAt<int64_t>(i));
+  }
+  ASSERT_EQ(outputIds, (std::vector<int64_t>{100, 200, 300}));
+
+  auto end = task->next(&future);
+  ASSERT_EQ(end, nullptr);
+}
+
+// Test that nulls are filtered out when nullAllowed is false.
+TEST_F(ValueStreamDynamicFilterTest, filterWithNulls) {
+  auto batch1 = makeRowVector({"id"}, {makeFlatVector<int64_t>({10, 20})});
+  auto batch2 = makeRowVector(
+      {"id"},
+      {makeNullableFlatVector<int64_t>({1, std::nullopt, 3, std::nullopt, 
5})});
+  auto outputType = asRowType(batch1->type());
+  auto scanNode = makeTableScanNode("vs3", outputType);
+
+  auto queryCtx = core::QueryCtx::create();
+  auto task = Task::create(
+      "test-nulls",
+      core::PlanFragment{scanNode},
+      0,
+      queryCtx,
+      Task::ExecutionMode::kSerial);
+
+  task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})});
+  task->noMoreSplits(scanNode->id());
+
+  ContinueFuture future = ContinueFuture::makeEmpty();
+  auto firstBatch = task->next(&future);
+  ASSERT_NE(firstBatch, nullptr);
+
+  task->testingVisitDrivers([&](Driver* driver) {
+    auto* op = driver->findOperator(scanNode->id());
+    if (!op) {
+      return;
+    }
+    PushdownFilters pf;
+    pf.filters[0] = std::make_shared<BigintRange>(3, 100, false);
+    pf.dynamicFilteredColumns.insert(0);
+    op->addDynamicFilterLocked("producer", pf);
+  });
+
+  auto secondBatch = task->next(&future);
+  ASSERT_NE(secondBatch, nullptr);
+
+  DecodedVector decoded(*secondBatch->childAt(0));
+  std::vector<int64_t> outputIds;
+  for (vector_size_t i = 0; i < secondBatch->size(); i++) {
+    ASSERT_FALSE(decoded.isNullAt(i));
+    outputIds.push_back(decoded.valueAt<int64_t>(i));
+  }
+  ASSERT_EQ(outputIds, (std::vector<int64_t>{3, 5}));
+
+  auto end = task->next(&future);
+  ASSERT_EQ(end, nullptr);
+}
+
+// Test canAddDynamicFilter returns true through the connector.
+TEST_F(ValueStreamDynamicFilterTest, canAddDynamicFilter) {
+  auto batch = makeRowVector({"id"}, {makeFlatVector<int64_t>({1})});
+  auto outputType = asRowType(batch->type());
+  auto scanNode = makeTableScanNode("vs4", outputType);
+
+  auto queryCtx = core::QueryCtx::create();
+  auto task = Task::create(
+      "test-can-add",
+      core::PlanFragment{scanNode},
+      0,
+      queryCtx,
+      Task::ExecutionMode::kSerial);
+
+  task->addSplit(scanNode->id(), Split{makeSplit({batch})});
+  task->noMoreSplits(scanNode->id());
+
+  ContinueFuture future = ContinueFuture::makeEmpty();
+  task->next(&future);
+
+  bool found = false;
+  task->testingVisitDrivers([&](Driver* driver) {
+    auto* op = driver->findOperator(scanNode->id());
+    if (op) {
+      ASSERT_TRUE(op->canAddDynamicFilter());
+      found = true;
+    }
+  });
+  ASSERT_TRUE(found);
+
+  auto end = task->next(&future);
+  ASSERT_EQ(end, nullptr);
+}
+
+} // namespace facebook::velox::test
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index 358dc41962..ff437f01bc 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -72,6 +72,7 @@ nav_order: 16
 | spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled   
    | false             | If true, checksum read verification from SSD is 
enabled.                                                                        
                                                                                
                                                                                
                                                                                
                    [...]
 | spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow                    
    | false             | True if copy on write should be disabled.             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.backend.velox.ssdODirect                           
    | false             | The O_DIRECT flag for cache writing                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled    
    | false             | Whether to apply dynamic filters pushed down from 
hash probe in the ValueStream (shuffle reader) operator to filter rows before 
they reach the hash join.                                                       
                                                                                
                                                                                
                    [...]
 | spark.gluten.sql.enable.enhancedFeatures                                     
    | true              | Enable some features including iceberg native write 
and other features.                                                             
                                                                                
                                                                                
                                                                                
                [...]
 | spark.gluten.sql.rewrite.castArrayToString                                   
    | true              | When true, rewrite `cast(array as String)` to 
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. 
                                                                                
                                                                                
                                                                                
                      [...]
 | spark.gluten.velox.castFromVarcharAddTrimNode                                
    | false             | If true, will add a trim node which has the same 
sementic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.           
                                                                                
                                                                                
                                                                                
                   [...]
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 52be6c4954..a6b0053593 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -85,6 +85,7 @@ trait IteratorApi {
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
       materializeInput: Boolean = false,
-      enableCudf: Boolean = false): Iterator[ColumnarBatch]
+      enableCudf: Boolean = false,
+      supportsValueStreamDynamicFilter: Boolean = true): 
Iterator[ColumnarBatch]
   // scalastyle:on argcount
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 959931b754..5efc99f77d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -51,7 +51,8 @@ case class TransformContext(outputAttributes: Seq[Attribute], 
root: RelNode)
 case class WholeStageTransformContext(
     root: PlanNode,
     substraitContext: SubstraitContext = null,
-    enableCudf: Boolean = false)
+    enableCudf: Boolean = false,
+    supportsValueStreamDynamicFilter: Boolean = true)
 
 /** Base interface for a query plan that can be interpreted to Substrait 
representation. */
 trait TransformSupport extends ValidatablePlan {
@@ -257,7 +258,34 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
       PlanBuilder.makePlan(substraitContext, 
Lists.newArrayList(childCtx.root), outNames)
     }
 
-    WholeStageTransformContext(planNode, substraitContext, isCudf)
+    WholeStageTransformContext(
+      planNode,
+      substraitContext,
+      isCudf,
+      !hasNonDeterministicExprInJoinProbe(child))
+  }
+
+  /**
+   * Checks whether any HashJoin's probe (streamed) side contains 
non-deterministic expressions.
+   * When true, ValueStream dynamic filter pushdown must be disabled because 
if left enabled, the
+   * dynamic filter would filter rows at the ValueStream (below the 
non-deterministic Project),
+   * changing how many times the non-deterministic expression is evaluated and 
thus altering its
+   * output sequence. See SPARK-10316.
+   */
+  private def hasNonDeterministicExprInJoinProbe(plan: SparkPlan): Boolean = {
+    plan match {
+      case join: HashJoinLikeExecTransformer =>
+        containsNonDeterministicExpr(join.streamedPlan) ||
+        hasNonDeterministicExprInJoinProbe(join.streamedPlan) ||
+        hasNonDeterministicExprInJoinProbe(join.buildPlan)
+      case other =>
+        other.children.exists(hasNonDeterministicExprInJoinProbe)
+    }
+  }
+
+  private def containsNonDeterministicExpr(plan: SparkPlan): Boolean = {
+    plan.expressions.exists(!_.deterministic) ||
+    plan.children.exists(containsNonDeterministicExpr)
   }
 
   def doWholeStageTransform(): WholeStageTransformContext = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
index 521388faae..393716bb7b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
@@ -54,7 +54,8 @@ class WholeStageZippedPartitionsRDD(
             updateNativeMetrics,
             split.index,
             materializeInput,
-            resCtx.enableCudf
+            resCtx.enableCudf,
+            resCtx.supportsValueStreamDynamicFilter
           )
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to