This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ec25cda135 [GLUTEN-11605][VL] Push dynamic filters down to ValueStream
(#11657)
ec25cda135 is described below
commit ec25cda13586a6047f2d08dbe67ea9498e4edc6c
Author: Ankita Victor <[email protected]>
AuthorDate: Fri Mar 13 15:08:09 2026 +0530
[GLUTEN-11605][VL] Push dynamic filters down to ValueStream (#11657)
---
.../backendsapi/clickhouse/CHIteratorApi.scala | 3 +-
.../java/org/apache/gluten/metrics/Metrics.java | 4 +
.../org/apache/gluten/metrics/OperatorMetrics.java | 3 +
.../backendsapi/velox/VeloxIteratorApi.scala | 9 +-
.../gluten/backendsapi/velox/VeloxMetricsApi.scala | 15 +-
.../org/apache/gluten/config/VeloxConfig.scala | 11 +
.../metrics/InputIteratorMetricsUpdater.scala | 6 +
.../org/apache/gluten/metrics/MetricsUtil.scala | 3 +
cpp/core/jni/JniWrapper.cc | 3 +-
cpp/core/utils/Metrics.h | 1 +
cpp/velox/compute/VeloxBackend.cc | 5 +-
cpp/velox/compute/WholeStageResultIterator.cc | 3 +
cpp/velox/config/VeloxConfig.h | 4 +
cpp/velox/operators/plannodes/RowVectorStream.cc | 114 +++++++-
cpp/velox/operators/plannodes/RowVectorStream.h | 60 +++-
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 4 +-
cpp/velox/tests/CMakeLists.txt | 3 +-
cpp/velox/tests/ValueStreamDynamicFilterTest.cc | 318 +++++++++++++++++++++
docs/velox-configuration.md | 1 +
.../apache/gluten/backendsapi/IteratorApi.scala | 3 +-
.../gluten/execution/WholeStageTransformer.scala | 32 ++-
.../execution/WholeStageZippedPartitionsRDD.scala | 3 +-
22 files changed, 589 insertions(+), 19 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 16dc2bccba..a705675e2d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -319,7 +319,8 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
materializeInput: Boolean,
- enableCudf: Boolean): Iterator[ColumnarBatch] = {
+ enableCudf: Boolean,
+ supportsValueStreamDynamicFilter: Boolean): Iterator[ColumnarBatch] = {
// scalastyle:on argcount
// Final iterator does not contain scan split, so pass empty split info to
native here.
diff --git
a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
index 3d23dc94db..9b7cd7c8d7 100644
--- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -40,6 +40,7 @@ public class Metrics implements IMetrics {
public long[] numDynamicFiltersProduced;
public long[] numDynamicFiltersAccepted;
public long[] numReplacedWithDynamicFilterRows;
+ public long[] numDynamicFilterInputRows;
public long[] flushRowCount;
public long[] loadedToValueHook;
public long[] bloomFilterBlocksByteSize;
@@ -90,6 +91,7 @@ public class Metrics implements IMetrics {
long[] numDynamicFiltersProduced,
long[] numDynamicFiltersAccepted,
long[] numReplacedWithDynamicFilterRows,
+ long[] numDynamicFilterInputRows,
long[] flushRowCount,
long[] loadedToValueHook,
long[] bloomFilterBlocksByteSize,
@@ -134,6 +136,7 @@ public class Metrics implements IMetrics {
this.numDynamicFiltersProduced = numDynamicFiltersProduced;
this.numDynamicFiltersAccepted = numDynamicFiltersAccepted;
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
+ this.numDynamicFilterInputRows = numDynamicFilterInputRows;
this.flushRowCount = flushRowCount;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
@@ -184,6 +187,7 @@ public class Metrics implements IMetrics {
numDynamicFiltersProduced[index],
numDynamicFiltersAccepted[index],
numReplacedWithDynamicFilterRows[index],
+ numDynamicFilterInputRows[index],
flushRowCount[index],
loadedToValueHook[index],
bloomFilterBlocksByteSize[index],
diff --git
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index 10563e507e..0726fd1b76 100644
---
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -38,6 +38,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long numDynamicFiltersProduced;
public long numDynamicFiltersAccepted;
public long numReplacedWithDynamicFilterRows;
+ public long numDynamicFilterInputRows;
public long flushRowCount;
public long loadedToValueHook;
public long bloomFilterBlocksByteSize;
@@ -83,6 +84,7 @@ public class OperatorMetrics implements IOperatorMetrics {
long numDynamicFiltersProduced,
long numDynamicFiltersAccepted,
long numReplacedWithDynamicFilterRows,
+ long numDynamicFilterInputRows,
long flushRowCount,
long loadedToValueHook,
long bloomFilterBlocksByteSize,
@@ -125,6 +127,7 @@ public class OperatorMetrics implements IOperatorMetrics {
this.numDynamicFiltersProduced = numDynamicFiltersProduced;
this.numDynamicFiltersAccepted = numDynamicFiltersAccepted;
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
+ this.numDynamicFilterInputRows = numDynamicFilterInputRows;
this.flushRowCount = flushRowCount;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 1088440774..60018554dd 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -247,8 +247,13 @@ class VeloxIteratorApi extends IteratorApi with Logging {
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
materializeInput: Boolean,
- enableCudf: Boolean = false): Iterator[ColumnarBatch] = {
- val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key ->
enableCudf.toString).asJava
+ enableCudf: Boolean = false,
+ supportsValueStreamDynamicFilter: Boolean = true):
Iterator[ColumnarBatch] = {
+ val extraConfMap = mutable.Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key ->
enableCudf.toString)
+ if (!supportsValueStreamDynamicFilter) {
+ extraConfMap(VeloxConfig.VALUE_STREAM_DYNAMIC_FILTER_ENABLED.key) =
"false"
+ }
+ val extraConf = extraConfMap.asJava
val transKernel =
NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)
val columnarNativeIterator =
inputIterators.map {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 29d882c3d3..f13217442f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -73,10 +73,23 @@ class VeloxMetricsApi extends MetricsApi with Logging {
SQLMetrics.createNanoTimingMetric(sparkContext, "time of operator input")
}
+ val dynamicFilterMetrics = if (forShuffle) {
+ Map(
+ "valueStreamDynamicFiltersAccepted" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of dynamic filters accepted by value stream"),
+ "valueStreamDynamicFilterInputRows" -> SQLMetrics.createMetric(
+ sparkContext,
+ "number of input rows")
+ )
+ } else {
+ Map.empty[String, SQLMetric]
+ }
+
Map(
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time
count"),
"wallNanos" -> wallNanosMetric
- ) ++ outputMetrics
+ ) ++ outputMetrics ++ dynamicFilterMetrics
}
override def genInputIteratorTransformerMetricsUpdater(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 46dcb55fe9..993c554808 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -94,6 +94,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def hashProbeDynamicFilterPushdownEnabled: Boolean =
getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED)
+
+ def valueStreamDynamicFilterEnabled: Boolean =
+ getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED)
}
object VeloxConfig extends ConfigRegistry {
@@ -455,6 +458,14 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(true)
+ val VALUE_STREAM_DYNAMIC_FILTER_ENABLED =
+
buildConf("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled")
+ .doc(
+ "Whether to apply dynamic filters pushed down from hash probe in the
ValueStream" +
+ " (shuffle reader) operator to filter rows before they reach the
hash join.")
+ .booleanConf
+ .createWithDefault(false)
+
val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled")
.doc(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala
index 79f6432645..ed006a5434 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/metrics/InputIteratorMetricsUpdater.scala
@@ -35,6 +35,12 @@ case class InputIteratorMetricsUpdater(metrics: Map[String,
SQLMetric], forBroad
metrics("numOutputRows") += operatorMetrics.outputRows
metrics("outputVectors") += operatorMetrics.outputVectors
}
+ metrics.get("valueStreamDynamicFiltersAccepted").foreach {
+ _ += operatorMetrics.numDynamicFiltersAccepted
+ }
+ metrics.get("valueStreamDynamicFilterInputRows").foreach {
+ _ += operatorMetrics.numDynamicFilterInputRows
+ }
}
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index 607de718ce..09430fdd70 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -120,6 +120,7 @@ object MetricsUtil extends Logging {
var numDynamicFiltersProduced: Long = 0
var numDynamicFiltersAccepted: Long = 0
var numReplacedWithDynamicFilterRows: Long = 0
+ var numDynamicFilterInputRows: Long = 0
var flushRowCount: Long = 0
var loadedToValueHook: Long = 0
var bloomFilterBlocksByteSize: Long = 0
@@ -155,6 +156,7 @@ object MetricsUtil extends Logging {
numDynamicFiltersProduced += metrics.numDynamicFiltersProduced
numDynamicFiltersAccepted += metrics.numDynamicFiltersAccepted
numReplacedWithDynamicFilterRows +=
metrics.numReplacedWithDynamicFilterRows
+ numDynamicFilterInputRows += metrics.numDynamicFilterInputRows
flushRowCount += metrics.flushRowCount
loadedToValueHook += metrics.loadedToValueHook
bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize
@@ -197,6 +199,7 @@ object MetricsUtil extends Logging {
numDynamicFiltersProduced,
numDynamicFiltersAccepted,
numReplacedWithDynamicFilterRows,
+ numDynamicFilterInputRows,
flushRowCount,
loadedToValueHook,
bloomFilterBlocksByteSize,
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index c48886195b..c8cd3adef4 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -273,7 +273,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env,
metricsBuilderClass,
"<init>",
-
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
+
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -589,6 +589,7 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kNumDynamicFiltersProduced],
longArray[Metrics::kNumDynamicFiltersAccepted],
longArray[Metrics::kNumReplacedWithDynamicFilterRows],
+ longArray[Metrics::kNumDynamicFilterInputRows],
longArray[Metrics::kFlushRowCount],
longArray[Metrics::kLoadedToValueHook],
longArray[Metrics::kBloomFilterBlocksByteSize],
diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h
index 8e33a4a9c6..6123f4a7ac 100644
--- a/cpp/core/utils/Metrics.h
+++ b/cpp/core/utils/Metrics.h
@@ -67,6 +67,7 @@ struct Metrics {
kNumDynamicFiltersProduced,
kNumDynamicFiltersAccepted,
kNumReplacedWithDynamicFilterRows,
+ kNumDynamicFilterInputRows,
kFlushRowCount,
kLoadedToValueHook,
kBloomFilterBlocksByteSize,
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index de9e9385f8..742d0cba70 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -319,7 +319,10 @@ void VeloxBackend::initConnector(const
std::shared_ptr<velox::config::ConfigBase
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId,
hiveConf, ioExecutor_.get()));
// Register value-stream connector for runtime iterator-based inputs
-
velox::connector::registerConnector(std::make_shared<ValueStreamConnector>(kIteratorConnectorId,
hiveConf));
+ auto valueStreamDynamicFilterEnabled =
+ backendConf_->get<bool>(kValueStreamDynamicFilterEnabled,
kValueStreamDynamicFilterEnabledDefault);
+ velox::connector::registerConnector(
+ std::make_shared<ValueStreamConnector>(kIteratorConnectorId, hiveConf,
valueStreamDynamicFilterEnabled));
#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnableTableScan,
kCudfEnableTableScanDefault) &&
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 9607147045..c92156a557 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -43,6 +43,7 @@ namespace {
const std::string kDynamicFiltersProduced = "dynamicFiltersProduced";
const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted";
const std::string kReplacedWithDynamicFilterRows =
"replacedWithDynamicFilterRows";
+const std::string kDynamicFilterInputRows = "dynamicFilterInputRows";
const std::string kFlushRowCount = "flushRowCount";
const std::string kLoadedToValueHook = "loadedToValueHook";
const std::string kBloomFilterBlocksByteSize = "bloomFilterSize";
@@ -490,6 +491,8 @@ void WholeStageResultIterator::collectMetrics() {
runtimeMetric("sum", second->customStats, kDynamicFiltersAccepted);
metrics_->get(Metrics::kNumReplacedWithDynamicFilterRows)[metricIndex] =
runtimeMetric("sum", second->customStats,
kReplacedWithDynamicFilterRows);
+ metrics_->get(Metrics::kNumDynamicFilterInputRows)[metricIndex] =
+ runtimeMetric("sum", second->customStats, kDynamicFilterInputRows);
metrics_->get(Metrics::kFlushRowCount)[metricIndex] =
runtimeMetric("sum", second->customStats, kFlushRowCount);
metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] =
runtimeMetric("sum", second->customStats, kLoadedToValueHook);
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 3c5c432b8c..6195620fc8 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -82,6 +82,10 @@ const std::string kHashProbeDynamicFilterPushdownEnabled =
const std::string kHashProbeBloomFilterPushdownMaxSize =
"spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize";
+const std::string kValueStreamDynamicFilterEnabled =
+
"spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled";
+const bool kValueStreamDynamicFilterEnabledDefault = false;
+
const std::string kShowTaskMetricsWhenFinished =
"spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished";
const bool kShowTaskMetricsWhenFinishedDefault = false;
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc
b/cpp/velox/operators/plannodes/RowVectorStream.cc
index 7c0b00979a..514e6e6735 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.cc
+++ b/cpp/velox/operators/plannodes/RowVectorStream.cc
@@ -19,6 +19,7 @@
#include "memory/VeloxColumnarBatch.h"
#include "velox/exec/Driver.h"
#include "velox/exec/Operator.h"
+#include "velox/exec/OperatorUtils.h"
#include "velox/exec/Task.h"
#include "velox/vector/arrow/Bridge.h"
@@ -113,7 +114,9 @@ ValueStreamDataSource::ValueStreamDataSource(
const facebook::velox::connector::ColumnHandleMap& columnHandles,
facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx)
: outputType_(outputType),
- pool_(connectorQueryCtx->memoryPool()) {}
+ pool_(connectorQueryCtx->memoryPool()),
+ dynamicFilterEnabled_(
+ std::dynamic_pointer_cast<const
ValueStreamTableHandle>(tableHandle)->dynamicFilterEnabled()) {}
void
ValueStreamDataSource::addSplit(std::shared_ptr<facebook::velox::connector::ConnectorSplit>
split) {
// Cast to IteratorConnectorSplit to extract the iterator
@@ -166,7 +169,116 @@ std::optional<facebook::velox::RowVectorPtr>
ValueStreamDataSource::next(
completedRows_ += rowVector->size();
completedBytes_ += rowVector->estimateFlatSize();
+ // Apply dynamic filters if any have been pushed down.
+ if (!dynamicFilters_.empty()) {
+ rowVector = applyDynamicFilters(rowVector);
+ if (!rowVector) {
+ // All rows filtered out, try next batch.
+ return next(size, future);
+ }
+ }
+
return rowVector;
}
+facebook::velox::RowVectorPtr ValueStreamDataSource::applyDynamicFilters(
+ const facebook::velox::RowVectorPtr& input) {
+ using namespace facebook::velox;
+
+ const auto numRows = input->size();
+ if (numRows == 0) {
+ return input;
+ }
+
+ SelectivityVector rows(numRows, true);
+
+ for (const auto& [channel, filter] : dynamicFilters_) {
+ if (!filter || channel >= input->childrenSize()) {
+ continue;
+ }
+ applyFilterOnColumn(filter, input->childAt(channel), rows);
+ if (!rows.hasSelections()) {
+ dynamicFilterInputRows_ += numRows;
+ return nullptr;
+ }
+ }
+
+ const auto passedCount = rows.countSelected();
+ if (passedCount == numRows) {
+ return input;
+ }
+
+ dynamicFilterInputRows_ += numRows;
+
+ BufferPtr indices = allocateIndices(passedCount, pool_);
+ auto* rawIndices = indices->asMutable<vector_size_t>();
+ vector_size_t idx = 0;
+ rows.applyToSelected([&](auto row) { rawIndices[idx++] = row; });
+
+ return exec::wrap(passedCount, std::move(indices), input);
+}
+
+void ValueStreamDataSource::applyFilterOnColumn(
+ const std::shared_ptr<facebook::velox::common::Filter>& filter,
+ const facebook::velox::VectorPtr& vector,
+ facebook::velox::SelectivityVector& rows) {
+ using namespace facebook::velox;
+
+ DecodedVector decoded(*vector, rows);
+
+ rows.applyToSelected([&](auto row) {
+ if (decoded.isNullAt(row)) {
+ if (!filter->testNull()) {
+ rows.setValid(row, false);
+ }
+ return;
+ }
+
+ bool pass = false;
+ switch (vector->typeKind()) {
+ case TypeKind::BOOLEAN:
+ pass = filter->testBool(decoded.valueAt<bool>(row));
+ break;
+ case TypeKind::TINYINT:
+ pass = filter->testInt64(decoded.valueAt<int8_t>(row));
+ break;
+ case TypeKind::SMALLINT:
+ pass = filter->testInt64(decoded.valueAt<int16_t>(row));
+ break;
+ case TypeKind::INTEGER:
+ pass = filter->testInt64(decoded.valueAt<int32_t>(row));
+ break;
+ case TypeKind::BIGINT:
+ pass = filter->testInt64(decoded.valueAt<int64_t>(row));
+ break;
+ case TypeKind::HUGEINT:
+ pass = filter->testInt128(decoded.valueAt<int128_t>(row));
+ break;
+ case TypeKind::REAL:
+ pass = filter->testFloat(decoded.valueAt<float>(row));
+ break;
+ case TypeKind::DOUBLE:
+ pass = filter->testDouble(decoded.valueAt<double>(row));
+ break;
+ case TypeKind::VARCHAR:
+ case TypeKind::VARBINARY: {
+ auto sv = decoded.valueAt<StringView>(row);
+ pass = filter->testBytes(sv.data(), sv.size());
+ break;
+ }
+ case TypeKind::TIMESTAMP:
+ pass = filter->testTimestamp(decoded.valueAt<Timestamp>(row));
+ break;
+ default:
+ // For unsupported types, let the row pass through.
+ pass = true;
+ break;
+ }
+ if (!pass) {
+ rows.setValid(row, false);
+ }
+ });
+ rows.updateBounds();
+}
+
} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h
b/cpp/velox/operators/plannodes/RowVectorStream.h
index 6e6ccd1527..c6dc81a757 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -23,7 +23,10 @@
#include "velox/connectors/Connector.h"
#include "velox/exec/Driver.h"
#include "velox/exec/Operator.h"
+#include "velox/exec/OperatorUtils.h"
#include "velox/exec/Task.h"
+#include "velox/type/Filter.h"
+#include "velox/vector/DecodedVector.h"
namespace gluten {
@@ -68,10 +71,17 @@ class ValueStreamDataSource : public
facebook::velox::connector::DataSource {
std::optional<facebook::velox::RowVectorPtr> next(uint64_t size,
facebook::velox::ContinueFuture& future) override;
+ const facebook::velox::common::SubfieldFilters* getFilters() const override {
+ return &emptyFilters_;
+ }
+
void addDynamicFilter(
facebook::velox::column_index_t outputChannel,
const std::shared_ptr<facebook::velox::common::Filter>& filter) override
{
- // Iterator-based sources don't support dynamic filtering
+ if (dynamicFilterEnabled_) {
+ dynamicFilters_[outputChannel] = filter;
+ numDynamicFiltersAccepted_++;
+ }
}
uint64_t getCompletedBytes() override {
@@ -83,10 +93,26 @@ class ValueStreamDataSource : public
facebook::velox::connector::DataSource {
}
std::unordered_map<std::string, facebook::velox::RuntimeMetric>
getRuntimeStats() override {
- return {};
+ std::unordered_map<std::string, facebook::velox::RuntimeMetric> stats;
+ stats["dynamicFiltersAccepted"] =
facebook::velox::RuntimeMetric(numDynamicFiltersAccepted_);
+ if (dynamicFilterInputRows_ > 0) {
+ stats["dynamicFilterInputRows"] =
facebook::velox::RuntimeMetric(dynamicFilterInputRows_);
+ }
+ return stats;
}
private:
+ // Applies dynamic filters to a batch, returning a dictionary-wrapped subset
+ // containing only the rows that pass all filters.
+ facebook::velox::RowVectorPtr applyDynamicFilters(const
facebook::velox::RowVectorPtr& input);
+
+ // Evaluates a Filter against a single column vector, deselecting rows that
+ // don't pass.
+ static void applyFilterOnColumn(
+ const std::shared_ptr<facebook::velox::common::Filter>& filter,
+ const facebook::velox::VectorPtr& vector,
+ facebook::velox::SelectivityVector& rows);
+
const facebook::velox::RowTypePtr outputType_;
facebook::velox::memory::MemoryPool* pool_;
@@ -94,21 +120,35 @@ class ValueStreamDataSource : public
facebook::velox::connector::DataSource {
std::shared_ptr<RowVectorStream> currentIterator_{nullptr};
uint64_t completedBytes_{0};
uint64_t completedRows_{0};
+
+ folly::F14FastMap<facebook::velox::column_index_t,
std::shared_ptr<facebook::velox::common::Filter>> dynamicFilters_;
+ const facebook::velox::common::SubfieldFilters emptyFilters_;
+ bool dynamicFilterEnabled_{true};
+ uint64_t numDynamicFiltersAccepted_{0};
+ uint64_t dynamicFilterInputRows_{0};
};
/// Table handle for iterator-based scans
class ValueStreamTableHandle : public
facebook::velox::connector::ConnectorTableHandle {
public:
- explicit ValueStreamTableHandle(std::string connectorId) :
ConnectorTableHandle(connectorId) {}
+ explicit ValueStreamTableHandle(std::string connectorId, bool
dynamicFilterEnabled = true)
+ : ConnectorTableHandle(connectorId),
dynamicFilterEnabled_(dynamicFilterEnabled) {}
const std::string& name() const override {
static const std::string kName = "ValueStreamTableHandle";
return kName;
}
+ bool dynamicFilterEnabled() const {
+ return dynamicFilterEnabled_;
+ }
+
folly::dynamic serialize() const override {
VELOX_NYI();
}
+
+ private:
+ bool dynamicFilterEnabled_;
};
/// Column handle for iterator-based scans
@@ -133,10 +173,15 @@ class ValueStreamColumnHandle : public
facebook::velox::connector::ColumnHandle
/// Connector implementation for iterator-based data sources
class ValueStreamConnector : public facebook::velox::connector::Connector {
public:
- explicit ValueStreamConnector(
+ ValueStreamConnector(
const std::string& id,
- std::shared_ptr<const facebook::velox::config::ConfigBase> config)
- : Connector(id, config) {}
+ std::shared_ptr<const facebook::velox::config::ConfigBase> config,
+ bool dynamicFilterEnabled = false)
+ : Connector(id, config), dynamicFilterEnabled_(dynamicFilterEnabled) {}
+
+ bool canAddDynamicFilter() const override {
+ return dynamicFilterEnabled_;
+ }
std::unique_ptr<facebook::velox::connector::DataSource> createDataSource(
const facebook::velox::RowTypePtr& outputType,
@@ -153,6 +198,9 @@ class ValueStreamConnector : public
facebook::velox::connector::Connector {
facebook::velox::connector::CommitStrategy commitStrategy) override {
VELOX_UNSUPPORTED("ValueStreamConnector does not support data sinks");
}
+
+ private:
+ bool dynamicFilterEnabled_;
};
/// Factory for creating ValueStreamConnector instances
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 834127e20c..adb7fc5f45 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1353,7 +1353,9 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructValueStreamNode(
auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
// Create TableHandle
- auto tableHandle =
std::make_shared<ValueStreamTableHandle>(kIteratorConnectorId);
+ bool dynamicFilterEnabled =
+ veloxCfg_->get<bool>(kValueStreamDynamicFilterEnabled,
kValueStreamDynamicFilterEnabledDefault);
+ auto tableHandle =
std::make_shared<ValueStreamTableHandle>(kIteratorConnectorId,
dynamicFilterEnabled);
// Create column assignments
connector::ColumnHandleMap assignments;
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index a690347bc4..0c61850e12 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -115,7 +115,8 @@ add_velox_test(
VeloxRowToColumnarTest.cc
VeloxColumnarBatchSerializerTest.cc
VeloxColumnarBatchTest.cc
- VeloxBatchResizerTest.cc)
+ VeloxBatchResizerTest.cc
+ ValueStreamDynamicFilterTest.cc)
add_velox_test(
velox_plan_conversion_test
SOURCES
diff --git a/cpp/velox/tests/ValueStreamDynamicFilterTest.cc
b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc
new file mode 100644
index 0000000000..6e13bfed01
--- /dev/null
+++ b/cpp/velox/tests/ValueStreamDynamicFilterTest.cc
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "memory/VeloxColumnarBatch.h"
+#include "operators/plannodes/RowVectorStream.h"
+#include "velox/type/Filter.h"
+#include "velox/vector/DecodedVector.h"
+#include "velox/vector/FlatVector.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+using namespace facebook::velox;
+using namespace facebook::velox::exec;
+using namespace facebook::velox::common;
+
+namespace facebook::velox::test {
+
+/// A ColumnarBatchIterator that yields pre-built RowVectors as
VeloxColumnarBatches.
+class TestBatchIterator final : public gluten::ColumnarBatchIterator {
+ public:
+ explicit TestBatchIterator(std::vector<RowVectorPtr> batches) :
batches_(std::move(batches)) {}
+
+ std::shared_ptr<gluten::ColumnarBatch> next() override {
+ if (idx_ >= batches_.size()) {
+ return nullptr;
+ }
+ return std::make_shared<gluten::VeloxColumnarBatch>(batches_[idx_++]);
+ }
+
+ private:
+ std::vector<RowVectorPtr> batches_;
+ size_t idx_ = 0;
+};
+
+class ValueStreamDynamicFilterTest : public ::testing::Test, public
VectorTestBase {
+ protected:
+ static void SetUpTestCase() {
+
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
+ }
+
+ void SetUp() override {
+ // Register the connector if not already registered.
+ if (!connector::hasConnector(gluten::kIteratorConnectorId)) {
+ auto config = std::make_shared<config::ConfigBase>(
+ std::unordered_map<std::string, std::string>());
+
connector::registerConnector(std::make_shared<gluten::ValueStreamConnector>(
+ gluten::kIteratorConnectorId, config,
/*dynamicFilterEnabled=*/true));
+ }
+ }
+
+ /// Build a TableScanNode that reads from the value-stream connector.
+ std::shared_ptr<core::TableScanNode> makeTableScanNode(
+ const std::string& nodeId,
+ const RowTypePtr& outputType) {
+ auto tableHandle =
+
std::make_shared<gluten::ValueStreamTableHandle>(gluten::kIteratorConnectorId);
+
+ connector::ColumnHandleMap assignments;
+ for (int idx = 0; idx < outputType->size(); idx++) {
+ auto name = outputType->nameOf(idx);
+ auto type = outputType->childAt(idx);
+ assignments[name] =
+ std::make_shared<gluten::ValueStreamColumnHandle>(name, type);
+ }
+
+ return std::make_shared<core::TableScanNode>(
+ nodeId, outputType, tableHandle, assignments);
+ }
+
+ /// Create a split wrapping the given batches.
+ std::shared_ptr<connector::ConnectorSplit> makeSplit(
+ std::vector<RowVectorPtr> batches) {
+ auto iter = std::make_shared<gluten::ResultIterator>(
+ std::make_unique<TestBatchIterator>(std::move(batches)));
+ return std::make_shared<gluten::IteratorConnectorSplit>(
+ gluten::kIteratorConnectorId, std::move(iter));
+ }
+
+ /// Read all int64 values from column 0 of a serial-mode task.
+ std::vector<int64_t> readAllInt64(Task* task) {
+ std::vector<int64_t> result;
+ ContinueFuture future = ContinueFuture::makeEmpty();
+ while (true) {
+ auto batch = task->next(&future);
+ if (!batch) {
+ break;
+ }
+ DecodedVector decoded(*batch->childAt(0));
+ for (vector_size_t i = 0; i < batch->size(); i++) {
+ result.push_back(decoded.valueAt<int64_t>(i));
+ }
+ }
+ return result;
+ }
+};
+
+// Test that without any filter, all rows pass through.
+TEST_F(ValueStreamDynamicFilterTest, noFilterPassesAllRows) {
+ auto batch = makeRowVector({"id"}, {makeFlatVector<int64_t>({10, 20, 30})});
+ auto outputType = asRowType(batch->type());
+ auto scanNode = makeTableScanNode("vs0", outputType);
+
+ auto queryCtx = core::QueryCtx::create();
+ auto task = Task::create(
+ "test-nofilter",
+ core::PlanFragment{scanNode},
+ 0,
+ queryCtx,
+ Task::ExecutionMode::kSerial);
+
+ task->addSplit(scanNode->id(), Split{makeSplit({batch})});
+ task->noMoreSplits(scanNode->id());
+
+ auto ids = readAllInt64(task.get());
+ ASSERT_EQ(ids, (std::vector<int64_t>{10, 20, 30}));
+}
+
+// Test that filtering works when filter is injected after first batch.
+TEST_F(ValueStreamDynamicFilterTest, filterBigintRange) {
+ auto batch1 = makeRowVector({"id"}, {makeFlatVector<int64_t>({1, 2, 3, 4,
5})});
+ auto batch2 = makeRowVector({"id"}, {makeFlatVector<int64_t>({6, 7, 8, 9,
10})});
+ auto outputType = asRowType(batch1->type());
+ auto scanNode = makeTableScanNode("vs1", outputType);
+
+ auto queryCtx = core::QueryCtx::create();
+ auto task = Task::create(
+ "test-bigint",
+ core::PlanFragment{scanNode},
+ 0,
+ queryCtx,
+ Task::ExecutionMode::kSerial);
+
+ // Add both batches as a single split.
+ task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})});
+ task->noMoreSplits(scanNode->id());
+
+ // First next() creates drivers and returns first batch (unfiltered).
+ ContinueFuture future = ContinueFuture::makeEmpty();
+ auto firstBatch = task->next(&future);
+ ASSERT_NE(firstBatch, nullptr);
+ ASSERT_EQ(firstBatch->size(), 5);
+
+ // Inject a BigintRange filter: keep only id >= 8.
+ task->testingVisitDrivers([&](Driver* driver) {
+ auto* op = driver->findOperator(scanNode->id());
+ if (!op) {
+ return;
+ }
+ ASSERT_TRUE(op->canAddDynamicFilter());
+ PushdownFilters pf;
+ pf.filters[0] = std::make_shared<BigintRange>(8, 10, false);
+ pf.dynamicFilteredColumns.insert(0);
+ op->addDynamicFilterLocked("producer", pf);
+ });
+
+ // Second next() should return filtered batch.
+ auto secondBatch = task->next(&future);
+ ASSERT_NE(secondBatch, nullptr);
+
+ DecodedVector decoded(*secondBatch->childAt(0));
+ std::vector<int64_t> outputIds;
+ for (vector_size_t i = 0; i < secondBatch->size(); i++) {
+ outputIds.push_back(decoded.valueAt<int64_t>(i));
+ }
+ ASSERT_EQ(outputIds, (std::vector<int64_t>{8, 9, 10}));
+
+ auto end = task->next(&future);
+ ASSERT_EQ(end, nullptr);
+}
+
+// Test that a filter eliminates all rows from a batch.
+TEST_F(ValueStreamDynamicFilterTest, filterEliminatesEntireBatch) {
+ auto batch1 = makeRowVector({"id"}, {makeFlatVector<int64_t>({1, 2, 3})});
+ auto batch2 = makeRowVector({"id"}, {makeFlatVector<int64_t>({100, 200,
300})});
+ auto outputType = asRowType(batch1->type());
+ auto scanNode = makeTableScanNode("vs2", outputType);
+
+ auto queryCtx = core::QueryCtx::create();
+ auto task = Task::create(
+ "test-eliminate",
+ core::PlanFragment{scanNode},
+ 0,
+ queryCtx,
+ Task::ExecutionMode::kSerial);
+
+ task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})});
+ task->noMoreSplits(scanNode->id());
+
+ ContinueFuture future = ContinueFuture::makeEmpty();
+ auto firstBatch = task->next(&future);
+ ASSERT_NE(firstBatch, nullptr);
+ ASSERT_EQ(firstBatch->size(), 3);
+
+ task->testingVisitDrivers([&](Driver* driver) {
+ auto* op = driver->findOperator(scanNode->id());
+ if (!op) {
+ return;
+ }
+ PushdownFilters pf;
+ pf.filters[0] = std::make_shared<BigintRange>(100, 300, false);
+ pf.dynamicFilteredColumns.insert(0);
+ op->addDynamicFilterLocked("producer", pf);
+ });
+
+ auto secondBatch = task->next(&future);
+ ASSERT_NE(secondBatch, nullptr);
+
+ DecodedVector decoded(*secondBatch->childAt(0));
+ std::vector<int64_t> outputIds;
+ for (vector_size_t i = 0; i < secondBatch->size(); i++) {
+ outputIds.push_back(decoded.valueAt<int64_t>(i));
+ }
+ ASSERT_EQ(outputIds, (std::vector<int64_t>{100, 200, 300}));
+
+ auto end = task->next(&future);
+ ASSERT_EQ(end, nullptr);
+}
+
+// Test that nulls are filtered out when nullAllowed is false.
+TEST_F(ValueStreamDynamicFilterTest, filterWithNulls) {
+ auto batch1 = makeRowVector({"id"}, {makeFlatVector<int64_t>({10, 20})});
+ auto batch2 = makeRowVector(
+ {"id"},
+ {makeNullableFlatVector<int64_t>({1, std::nullopt, 3, std::nullopt,
5})});
+ auto outputType = asRowType(batch1->type());
+ auto scanNode = makeTableScanNode("vs3", outputType);
+
+ auto queryCtx = core::QueryCtx::create();
+ auto task = Task::create(
+ "test-nulls",
+ core::PlanFragment{scanNode},
+ 0,
+ queryCtx,
+ Task::ExecutionMode::kSerial);
+
+ task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})});
+ task->noMoreSplits(scanNode->id());
+
+ ContinueFuture future = ContinueFuture::makeEmpty();
+ auto firstBatch = task->next(&future);
+ ASSERT_NE(firstBatch, nullptr);
+
+ task->testingVisitDrivers([&](Driver* driver) {
+ auto* op = driver->findOperator(scanNode->id());
+ if (!op) {
+ return;
+ }
+ PushdownFilters pf;
+ pf.filters[0] = std::make_shared<BigintRange>(3, 100, false);
+ pf.dynamicFilteredColumns.insert(0);
+ op->addDynamicFilterLocked("producer", pf);
+ });
+
+ auto secondBatch = task->next(&future);
+ ASSERT_NE(secondBatch, nullptr);
+
+ DecodedVector decoded(*secondBatch->childAt(0));
+ std::vector<int64_t> outputIds;
+ for (vector_size_t i = 0; i < secondBatch->size(); i++) {
+ ASSERT_FALSE(decoded.isNullAt(i));
+ outputIds.push_back(decoded.valueAt<int64_t>(i));
+ }
+ ASSERT_EQ(outputIds, (std::vector<int64_t>{3, 5}));
+
+ auto end = task->next(&future);
+ ASSERT_EQ(end, nullptr);
+}
+
+// Test canAddDynamicFilter returns true through the connector.
+TEST_F(ValueStreamDynamicFilterTest, canAddDynamicFilter) {
+ auto batch = makeRowVector({"id"}, {makeFlatVector<int64_t>({1})});
+ auto outputType = asRowType(batch->type());
+ auto scanNode = makeTableScanNode("vs4", outputType);
+
+ auto queryCtx = core::QueryCtx::create();
+ auto task = Task::create(
+ "test-can-add",
+ core::PlanFragment{scanNode},
+ 0,
+ queryCtx,
+ Task::ExecutionMode::kSerial);
+
+ task->addSplit(scanNode->id(), Split{makeSplit({batch})});
+ task->noMoreSplits(scanNode->id());
+
+ ContinueFuture future = ContinueFuture::makeEmpty();
+ task->next(&future);
+
+ bool found = false;
+ task->testingVisitDrivers([&](Driver* driver) {
+ auto* op = driver->findOperator(scanNode->id());
+ if (op) {
+ ASSERT_TRUE(op->canAddDynamicFilter());
+ found = true;
+ }
+ });
+ ASSERT_TRUE(found);
+
+ auto end = task->next(&future);
+ ASSERT_EQ(end, nullptr);
+}
+
+} // namespace facebook::velox::test
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index 358dc41962..ff437f01bc 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -72,6 +72,7 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled
| false | If true, checksum read verification from SSD is
enabled.
[...]
| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow
| false | True if copy on write should be disabled.
[...]
| spark.gluten.sql.columnar.backend.velox.ssdODirect
| false | The O_DIRECT flag for cache writing
[...]
+| spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled
| false | Whether to apply dynamic filters pushed down from
hash probe in the ValueStream (shuffle reader) operator to filter rows before
they reach the hash join.
[...]
| spark.gluten.sql.enable.enhancedFeatures
| true | Enable some features including iceberg native write
and other features.
[...]
| spark.gluten.sql.rewrite.castArrayToString
| true | When true, rewrite `cast(array as String)` to
`concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox.
[...]
| spark.gluten.velox.castFromVarcharAddTrimNode
| false | If true, will add a trim node which has the same
sementic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing.
[...]
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 52be6c4954..a6b0053593 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -85,6 +85,7 @@ trait IteratorApi {
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
materializeInput: Boolean = false,
- enableCudf: Boolean = false): Iterator[ColumnarBatch]
+ enableCudf: Boolean = false,
+ supportsValueStreamDynamicFilter: Boolean = true):
Iterator[ColumnarBatch]
// scalastyle:on argcount
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 959931b754..5efc99f77d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -51,7 +51,8 @@ case class TransformContext(outputAttributes: Seq[Attribute],
root: RelNode)
case class WholeStageTransformContext(
root: PlanNode,
substraitContext: SubstraitContext = null,
- enableCudf: Boolean = false)
+ enableCudf: Boolean = false,
+ supportsValueStreamDynamicFilter: Boolean = true)
/** Base interface for a query plan that can be interpreted to Substrait
representation. */
trait TransformSupport extends ValidatablePlan {
@@ -257,7 +258,34 @@ case class WholeStageTransformer(child: SparkPlan,
materializeInput: Boolean = f
PlanBuilder.makePlan(substraitContext,
Lists.newArrayList(childCtx.root), outNames)
}
- WholeStageTransformContext(planNode, substraitContext, isCudf)
+ WholeStageTransformContext(
+ planNode,
+ substraitContext,
+ isCudf,
+ !hasNonDeterministicExprInJoinProbe(child))
+ }
+
+ /**
+ * Checks whether any HashJoin's probe (streamed) side contains
non-deterministic expressions.
+ * When true, ValueStream dynamic filter pushdown must be disabled because
if left enabled, the
+ * dynamic filter would filter rows at the ValueStream (below the
non-deterministic Project),
+ * changing how many times the non-deterministic expression is evaluated and
thus altering its
+ * output sequence. See SPARK-10316.
+ */
+ private def hasNonDeterministicExprInJoinProbe(plan: SparkPlan): Boolean = {
+ plan match {
+ case join: HashJoinLikeExecTransformer =>
+ containsNonDeterministicExpr(join.streamedPlan) ||
+ hasNonDeterministicExprInJoinProbe(join.streamedPlan) ||
+ hasNonDeterministicExprInJoinProbe(join.buildPlan)
+ case other =>
+ other.children.exists(hasNonDeterministicExprInJoinProbe)
+ }
+ }
+
+ private def containsNonDeterministicExpr(plan: SparkPlan): Boolean = {
+ plan.expressions.exists(!_.deterministic) ||
+ plan.children.exists(containsNonDeterministicExpr)
}
def doWholeStageTransform(): WholeStageTransformContext = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
index 521388faae..393716bb7b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageZippedPartitionsRDD.scala
@@ -54,7 +54,8 @@ class WholeStageZippedPartitionsRDD(
updateNativeMetrics,
split.index,
materializeInput,
- resCtx.enableCudf
+ resCtx.enableCudf,
+ resCtx.supportsValueStreamDynamicFilter
)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]