This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 420b7046ad [GLUTEN-11514][VL] Refactor plan execution by adding
`addIteratorSplits` and `noMoreSplits` methods to the plan execution API
(#11527)
420b7046ad is described below
commit 420b7046ade81d69bf7d7d88a96076139db8b303
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Feb 4 13:38:23 2026 +0000
[GLUTEN-11514][VL] Refactor plan execution by adding `addIteratorSplits`
and `noMoreSplits` methods to the plan execution API (#11527)
---
.../backendsapi/velox/VeloxIteratorApi.scala | 11 +-
.../gluten/execution/MiscOperatorSuite.scala | 2 +-
cpp/core/compute/Runtime.h | 5 +
cpp/core/jni/JniCommon.h | 3 +-
cpp/core/jni/JniWrapper.cc | 93 ++++++++++--
cpp/core/memory/SplitAwareColumnarBatchIterator.h | 49 +++++++
cpp/velox/CMakeLists.txt | 1 +
cpp/velox/benchmarks/GenericBenchmark.cc | 1 +
cpp/velox/compute/VeloxBackend.cc | 6 +-
cpp/velox/compute/VeloxPlanConverter.cc | 9 +-
cpp/velox/compute/VeloxPlanConverter.h | 14 +-
cpp/velox/compute/VeloxRuntime.cc | 30 +++-
cpp/velox/compute/VeloxRuntime.h | 2 +
cpp/velox/compute/WholeStageResultIterator.cc | 35 ++++-
cpp/velox/compute/WholeStageResultIterator.h | 25 ++--
cpp/velox/cudf/CudfPlanValidator.cc | 4 +-
cpp/velox/jni/JniFileSystem.cc | 1 +
cpp/velox/jni/VeloxJniWrapper.cc | 2 +
.../{RowVectorStream.cc => CudfVectorStream.cc} | 10 +-
cpp/velox/operators/plannodes/CudfVectorStream.h | 75 +++++++++-
cpp/velox/operators/plannodes/IteratorSplit.h | 47 ++++++
cpp/velox/operators/plannodes/RowVectorStream.cc | 83 ++++++++++-
cpp/velox/operators/plannodes/RowVectorStream.h | 162 +++++++++++++--------
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 106 ++++++++++----
cpp/velox/substrait/SubstraitToVeloxPlan.h | 18 ++-
.../substrait/SubstraitToVeloxPlanValidator.h | 4 +-
cpp/velox/tests/FunctionTest.cc | 2 +-
cpp/velox/tests/RuntimeTest.cc | 6 +
.../tests/Substrait2VeloxPlanConversionTest.cc | 2 +-
.../Substrait2VeloxValuesNodeConversionTest.cc | 2 +-
cpp/velox/tests/VeloxSubstraitRoundTripTest.cc | 4 +-
.../vectorized/ColumnarBatchOutIterator.java | 33 +++++
.../gluten/vectorized/NativePlanEvaluator.java | 6 +-
.../gluten/vectorized/PlanEvaluatorJniWrapper.java | 5 +-
34 files changed, 681 insertions(+), 177 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index c6ad7d5029..668e60b205 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -213,11 +213,12 @@ class VeloxIteratorApi extends IteratorApi with Logging {
val resIter: ColumnarBatchOutIterator =
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
- splitInfoByteArray,
- columnarNativeIterators.asJava,
+ if (splitInfoByteArray.nonEmpty) splitInfoByteArray else null,
+ if (columnarNativeIterators.nonEmpty) columnarNativeIterators.toArray
else null,
partitionIndex,
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
)
+ resIter.noMoreSplits()
val itrMetrics = IteratorMetricsJniWrapper.create()
Iterators
@@ -261,12 +262,12 @@ class VeloxIteratorApi extends IteratorApi with Logging {
val nativeResultIterator =
transKernel.createKernelWithBatchIterator(
rootNode.toProtobuf.toByteArray,
- // Final iterator does not contain scan split, so pass empty split
info to native here.
- new Array[Array[Byte]](0),
- columnarNativeIterator.asJava,
+ null,
+ if (columnarNativeIterator.nonEmpty) columnarNativeIterator.toArray
else null,
partitionIndex,
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
)
+ nativeResultIterator.noMoreSplits()
val itrMetrics = IteratorMetricsJniWrapper.create()
Iterators
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 0ea6bde3d8..e1a0fd98ee 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -869,7 +869,7 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
assert(wholeStageTransformers.size == 3)
val nativePlanString = wholeStageTransformers.head.nativePlanString()
- assert(nativePlanString.contains("Aggregation[1][SINGLE"))
+
assert(nativePlanString.matches("[\\s\\S]*Aggregation\\[\\d+]\\[SINGLE[\\s\\S]*"))
assert(nativePlanString.contains("ValueStream"))
assert(wholeStageTransformers(1).nativePlanString().contains("ValueStream"))
assert(wholeStageTransformers.last.nativePlanString().contains("TableScan"))
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index c1f82a0c34..a15cdc83d3 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -23,6 +23,7 @@
#include "compute/ResultIterator.h"
#include "memory/ColumnarBatch.h"
#include "memory/MemoryManager.h"
+#include "memory/SplitAwareColumnarBatchIterator.h"
#include "operators/c2r/ColumnarToRow.h"
#include "operators/r2c/RowToColumnar.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
@@ -105,6 +106,10 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
throw GlutenException("Not implemented");
}
+ virtual void noMoreSplits(ResultIterator* iter) {
+ throw GlutenException("Not implemented");
+ }
+
virtual std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t
numRows) {
throw GlutenException("Not implemented");
}
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index cb400e2db3..1044edb6d7 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -233,6 +233,7 @@ class SafeNativeArray {
public:
virtual ~SafeNativeArray() {
PrimitiveArray::release(env_, javaArray_, nativeArray_);
+ env_->DeleteLocalRef(javaArray_);
}
SafeNativeArray(const SafeNativeArray&) = delete;
@@ -255,7 +256,7 @@ class SafeNativeArray {
private:
SafeNativeArray(JNIEnv* env, JavaArrayType javaArray, JniNativeArrayType
nativeArray)
- : env_(env), javaArray_(javaArray), nativeArray_(nativeArray){};
+ : env_(env),
javaArray_(static_cast<JavaArrayType>(env_->NewLocalRef(javaArray))),
nativeArray_(nativeArray){};
JNIEnv* env_;
JavaArrayType javaArray_;
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index adada15f91..e9d797e1db 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -29,6 +29,7 @@
#include <optional>
#include <string>
#include "memory/AllocationListener.h"
+#include "memory/SplitAwareColumnarBatchIterator.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/Partitioning.h"
@@ -455,7 +456,7 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
jobject wrapper,
jbyteArray planArr,
jobjectArray splitInfosArr,
- jobjectArray iterArr,
+ jobjectArray batchItrArray,
jint stageId,
jint partitionId,
jlong taskId,
@@ -478,24 +479,28 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
ctx->parsePlan(safePlanArray.elems(), planSize);
- for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i
< splitInfoArraySize; i++) {
- jbyteArray splitInfoArray =
static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
- jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
- auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray);
- auto splitInfoData = safeSplitArray.elems();
+ if (splitInfosArr != nullptr) {
+ for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr);
i < splitInfoArraySize; i++) {
+ jbyteArray splitInfoArray =
static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
+ jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
+ auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray);
+ auto splitInfoData = safeSplitArray.elems();
- ctx->parseSplitInfo(splitInfoData, splitInfoSize, i);
+ ctx->parseSplitInfo(splitInfoData, splitInfoSize, i);
+ }
}
// Handle the Java iters
- jsize itersLen = env->GetArrayLength(iterArr);
std::vector<std::shared_ptr<ResultIterator>> inputIters;
- inputIters.reserve(itersLen);
- for (int idx = 0; idx < itersLen; idx++) {
- jobject iter = env->GetObjectArrayElement(iterArr, idx);
- auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter,
ctx, idx);
- auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
- inputIters.push_back(std::move(resultIter));
+ if (batchItrArray != nullptr) {
+ jsize itersLen = env->GetArrayLength(batchItrArray);
+ inputIters.reserve(itersLen);
+ for (int idx = 0; idx < itersLen; idx++) {
+ jobject iter = env->GetObjectArrayElement(batchItrArray, idx);
+ auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter,
ctx, idx);
+ auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
+ inputIters.push_back(std::move(resultIter));
+ }
}
return ctx->saveObject(ctx->createResultIterator(spillDirStr, inputIters));
@@ -630,6 +635,66 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterato
JNI_METHOD_END()
}
+JNIEXPORT jboolean JNICALL
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeAddIteratorSplits(
// NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong iterHandle,
+ jobjectArray batchItrArray) {
+ JNI_METHOD_START
+ auto ctx = getRuntime(env, wrapper);
+ auto outIter = ObjectStore::retrieve<ResultIterator>(iterHandle);
+ if (outIter == nullptr) {
+ throw GlutenException("Invalid iterator handle for addSplits");
+ }
+
+ // Get the underlying split-aware iterator
+ auto* splitAwareIter =
dynamic_cast<gluten::SplitAwareColumnarBatchIterator*>(outIter->getInputIter());
+ if (splitAwareIter == nullptr) {
+ throw GlutenException("Iterator does not support split management");
+ }
+
+ GLUTEN_CHECK(batchItrArray != nullptr, "FATAL: Splits to add cannot be
null");
+
+ // Convert Java ColumnarBatchInIterator[] to native iterators and add as
splits
+ jsize numIterators = env->GetArrayLength(batchItrArray);
+ std::vector<std::shared_ptr<ResultIterator>> inputIterators;
+ inputIterators.reserve(numIterators);
+
+ for (jsize idx = 0; idx < numIterators; idx++) {
+ jobject iter = env->GetObjectArrayElement(batchItrArray, idx);
+ if (iter == nullptr) {
+ inputIterators.push_back(nullptr);
+ } else {
+ auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter,
ctx, idx);
+ auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
+ inputIterators.push_back(std::move(resultIter));
+ }
+ env->DeleteLocalRef(iter);
+ }
+
+ // Add iterator splits via interface method
+ if (!inputIterators.empty()) {
+ splitAwareIter->addIteratorSplits(inputIterators);
+ }
+
+ return true;
+ JNI_METHOD_END(false)
+}
+
+JNIEXPORT void JNICALL
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeNoMoreSplits(
// NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong iterHandle) {
+ JNI_METHOD_START
+ auto ctx = getRuntime(env, wrapper);
+ auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
+ if (iter == nullptr) {
+ throw GlutenException("Invalid iterator handle for noMoreSplits");
+ }
+ ctx->noMoreSplits(iter.get());
+ JNI_METHOD_END()
+}
+
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit(
// NOLINT
JNIEnv* env,
diff --git a/cpp/core/memory/SplitAwareColumnarBatchIterator.h
b/cpp/core/memory/SplitAwareColumnarBatchIterator.h
new file mode 100644
index 0000000000..e12f38afad
--- /dev/null
+++ b/cpp/core/memory/SplitAwareColumnarBatchIterator.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+#include "ColumnarBatchIterator.h"
+
+// Forward declarations
+namespace substrait {
+class ReadRel_LocalFiles;
+}
+
+namespace gluten {
+
+// Forward declaration
+class ResultIterator;
+
+/// Abstract base class for iterators that support dynamic split management.
+/// Provides APIs for adding splits after iterator creation and signaling
completion.
+class SplitAwareColumnarBatchIterator : public ColumnarBatchIterator {
+ public:
+ SplitAwareColumnarBatchIterator() = default;
+ virtual ~SplitAwareColumnarBatchIterator() = default;
+
+ /// Add iterator-based splits from input iterators.
+ virtual void addIteratorSplits(const
std::vector<std::shared_ptr<ResultIterator>>& inputIterators) = 0;
+
+ /// Signal that no more splits will be added to this iterator.
+ /// This must be called after all splits have been added to ensure proper
task completion.
+ virtual void noMoreSplits() = 0;
+};
+
+} // namespace gluten
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 56fab701ee..eb0e11e04b 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -205,6 +205,7 @@ if(ENABLE_GPU)
VELOX_SRCS
cudf/CudfPlanValidator.cc
cudf/GpuLock.cc
+ operators/plannodes/CudfVectorStream.cc
shuffle/VeloxGpuShuffleReader.cc
shuffle/VeloxGpuShuffleWriter.cc
operators/serializer/VeloxGpuColumnarBatchSerializer.cc
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 2121254137..616ba9bcfb 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -452,6 +452,7 @@ auto BM_Generic = [](::benchmark::State& state,
}
auto resultIter = runtime->createResultIterator(veloxSpillDir,
std::move(inputIters));
+ runtime->noMoreSplits(resultIter.get());
listenerPtr->setIterator(resultIter.get());
if (FLAGS_with_shuffle) {
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index 75fd84b104..45a64908e1 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -47,6 +47,7 @@
#include "velox/connectors/hive/BufferedInputBuilder.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
+#include "operators/plannodes/RowVectorStream.h"
#include
"velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" //
@manual
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h"
// @manual
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
@@ -204,7 +205,6 @@ void VeloxBackend::init(
// RSS shuffle serde.
facebook::velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde();
}
-
velox::exec::Operator::registerOperator(std::make_unique<RowVectorStreamOperatorTranslator>());
initUdf();
@@ -317,6 +317,10 @@ void VeloxBackend::initConnector(const
std::shared_ptr<velox::config::ConfigBase
}
velox::connector::registerConnector(
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId,
hiveConf, ioExecutor_.get()));
+
+ // Register value-stream connector for runtime iterator-based inputs
+
velox::connector::registerConnector(std::make_shared<ValueStreamConnector>(kIteratorConnectorId,
hiveConf));
+
#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnableTableScan,
kCudfEnableTableScanDefault) &&
backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index 05e78eb1ba..4764fad382 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -18,27 +18,25 @@
#include "VeloxPlanConverter.h"
#include <filesystem>
-#include "compute/ResultIterator.h"
#include "config/GlutenConfig.h"
#include "iceberg/IcebergPlanConverter.h"
-#include "velox/common/file/FileSystems.h"
+#include "operators/plannodes/IteratorSplit.h"
namespace gluten {
using namespace facebook;
VeloxPlanConverter::VeloxPlanConverter(
- const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
velox::memory::MemoryPool* veloxPool,
const facebook::velox::config::ConfigBase* veloxCfg,
+ const std::vector<std::shared_ptr<ResultIterator>>& rowVectors,
const std::optional<std::string> writeFilesTempPath,
const std::optional<std::string> writeFileName,
bool validationMode)
: validationMode_(validationMode),
veloxCfg_(veloxCfg),
- substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath,
writeFileName, validationMode) {
+ substraitVeloxPlanConverter_(veloxPool, veloxCfg, rowVectors,
writeFilesTempPath, writeFileName, validationMode) {
VELOX_USER_CHECK_NOT_NULL(veloxCfg_);
- substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
}
namespace {
@@ -136,7 +134,6 @@ void parseLocalFileNodes(
splitInfos.reserve(localFiles.size());
for (const auto& localFile : localFiles) {
const auto& fileList = localFile.items();
-
splitInfos.push_back(parseScanSplitInfo(veloxCfg, fileList));
}
diff --git a/cpp/velox/compute/VeloxPlanConverter.h
b/cpp/velox/compute/VeloxPlanConverter.h
index 4678dccea7..0b597a91f9 100644
--- a/cpp/velox/compute/VeloxPlanConverter.h
+++ b/cpp/velox/compute/VeloxPlanConverter.h
@@ -18,11 +18,11 @@
#pragma once
#include <velox/common/memory/MemoryPool.h>
-#include "compute/ResultIterator.h"
-#include "memory/VeloxMemoryManager.h"
+#include <velox/core/PlanNode.h>
+#include <velox/exec/Split.h>
+
#include "substrait/SubstraitToVeloxPlan.h"
#include "substrait/plan.pb.h"
-#include "velox/core/PlanNode.h"
namespace gluten {
@@ -30,9 +30,9 @@ namespace gluten {
class VeloxPlanConverter {
public:
explicit VeloxPlanConverter(
- const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
facebook::velox::memory::MemoryPool* veloxPool,
const facebook::velox::config::ConfigBase* veloxCfg,
+ const std::vector<std::shared_ptr<ResultIterator>>& rowVectors,
const std::optional<std::string> writeFilesTempPath = std::nullopt,
const std::optional<std::string> writeFileName = std::nullopt,
bool validationMode = false);
@@ -45,6 +45,12 @@ class VeloxPlanConverter {
return substraitVeloxPlanConverter_.splitInfos();
}
+ /// The input iterators not inlined to VeloxPlan. They should be then
manually added to the Velox task
+ /// via WholeStageResultIterator#addIteratorSplits. Empty if no input
iterators remaining.
+ const std::vector<std::shared_ptr<ResultIterator>>&
remainingInputIterators() const {
+ return substraitVeloxPlanConverter_.remainingInputIterators();
+ }
+
private:
bool validationMode_;
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index af0fde37ce..69d21b4a57 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -17,6 +17,8 @@
#include "VeloxRuntime.h"
+#include <operators/plannodes/RowVectorStream.h>
+
#include <algorithm>
#include <filesystem>
@@ -124,14 +126,19 @@ void VeloxRuntime::getInfoAndIds(
std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
std::vector<velox::core::PlanNodeId>& scanIds,
std::vector<velox::core::PlanNodeId>& streamIds) {
+ int32_t streamIdx = 0;
for (const auto& leafPlanNodeId : leafPlanNodeIds) {
auto it = splitInfoMap.find(leafPlanNodeId);
if (it == splitInfoMap.end()) {
throw std::runtime_error("Could not find leafPlanNodeId.");
}
auto splitInfo = it->second;
+ // Based on the current code, indexing of streams and files follow
different orders:
+ // 1. Streams follow "iterator:<idx>" in the substrait plan;
+ // 2. Files follow the traversal order in the plan node tree.
+ // FIXME: Why we didn't have a unified design?
if (splitInfo->isStream) {
- streamIds.emplace_back(leafPlanNodeId);
+
streamIds.emplace_back(ValueStreamConnectorFactory::nodeIdOf(streamIdx++));
} else {
scanInfos.emplace_back(splitInfo);
scanIds.emplace_back(leafPlanNodeId);
@@ -140,10 +147,8 @@ void VeloxRuntime::getInfoAndIds(
}
std::string VeloxRuntime::planString(bool details, const
std::unordered_map<std::string, std::string>& sessionConf) {
- std::vector<std::shared_ptr<ResultIterator>> inputs;
auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
- VeloxPlanConverter veloxPlanConverter(
- inputs, veloxMemoryPool.get(), veloxCfg_.get(), std::nullopt,
std::nullopt, true);
+ VeloxPlanConverter veloxPlanConverter(veloxMemoryPool.get(),
veloxCfg_.get(), {}, std::nullopt, std::nullopt, true);
auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_);
return veloxPlan->toString(details, true);
}
@@ -160,9 +165,9 @@ std::shared_ptr<ResultIterator>
VeloxRuntime::createResultIterator(
LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" <<
printConfig(confMap_);
VeloxPlanConverter veloxPlanConverter(
- inputs,
memoryManager()->getLeafMemoryPool().get(),
veloxCfg_.get(),
+ inputs,
*localWriteFilesTempPath(),
*localWriteFileName());
veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_,
std::move(localFiles_));
@@ -187,9 +192,24 @@ std::shared_ptr<ResultIterator>
VeloxRuntime::createResultIterator(
spillDir,
veloxCfg_,
taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{});
+
+ auto remainingInputIterators = veloxPlanConverter.remainingInputIterators();
+ if (!remainingInputIterators.empty()) {
+ // Converts remaining input iterators to splits and add them to the task.
+ wholeStageIter->addIteratorSplits(remainingInputIterators);
+ }
+
return std::make_shared<ResultIterator>(std::move(wholeStageIter), this);
}
+void VeloxRuntime::noMoreSplits(ResultIterator* iter){
+ auto* splitAwareIter =
dynamic_cast<gluten::SplitAwareColumnarBatchIterator*>(iter->getInputIter());
+ if (splitAwareIter == nullptr) {
+ throw GlutenException("Iterator does not support split management");
+ }
+ splitAwareIter->noMoreSplits();
+}
+
std::shared_ptr<ColumnarToRowConverter>
VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
return std::make_shared<VeloxColumnarToRowConverter>(veloxPool,
column2RowMemThreshold);
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index b39733e839..a3c3da0c5a 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -59,6 +59,8 @@ class VeloxRuntime final : public Runtime {
const std::string& spillDir,
const std::vector<std::shared_ptr<ResultIterator>>& inputs = {})
override;
+ void noMoreSplits(ResultIterator* iter) override;
+
std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t
column2RowMemThreshold) override;
std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows)
override;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index e91e2ad69d..ac5773e439 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -16,6 +16,7 @@
*/
#include "WholeStageResultIterator.h"
#include "VeloxBackend.h"
+#include "VeloxPlanConverter.h"
#include "VeloxRuntime.h"
#include "config/VeloxConfig.h"
#include "utils/ConfigExtractor.h"
@@ -29,6 +30,8 @@
#include "velox/experimental/cudf/exec/ToCudf.h"
#include "cudf/GpuLock.h"
#endif
+#include "operators/plannodes/RowVectorStream.h"
+
using namespace facebook;
@@ -227,7 +230,6 @@ std::shared_ptr<velox::core::QueryCtx>
WholeStageResultIterator::createNewVeloxQ
}
std::shared_ptr<ColumnarBatch> WholeStageResultIterator::next() {
- tryAddSplitsToTask();
if (task_->isFinished()) {
return nullptr;
}
@@ -355,17 +357,40 @@ void WholeStageResultIterator::constructPartitionColumns(
}
}
-void WholeStageResultIterator::tryAddSplitsToTask() {
- if (noMoreSplits_) {
+void WholeStageResultIterator::addIteratorSplits(const
std::vector<std::shared_ptr<ResultIterator>>& inputIterators) {
+ GLUTEN_CHECK(!allSplitsAdded, "Method addIteratorSplits should not be called
since all splits has been added to the Velox task.");
+ // Create IteratorConnectorSplit for each iterator
+ for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) {
+ if (inputIterators[i] == nullptr) {
+ continue;
+ }
+ auto connectorSplit = std::make_shared<IteratorConnectorSplit>(
+ kIteratorConnectorId, inputIterators[i]);
+ exec::Split split(folly::copy(connectorSplit), -1);
+ task_->addSplit(streamIds_[i], std::move(split));
+ }
+}
+
+void WholeStageResultIterator::noMoreSplits() {
+ if (allSplitsAdded) {
return;
}
+ // Mark no more splits for all scan nodes
for (int idx = 0; idx < scanNodeIds_.size(); idx++) {
for (auto& split : splits_[idx]) {
task_->addSplit(scanNodeIds_[idx], std::move(split));
}
- task_->noMoreSplits(scanNodeIds_[idx]);
}
- noMoreSplits_ = true;
+
+ for (const auto& scanNodeId : scanNodeIds_) {
+ task_->noMoreSplits(scanNodeId);
+ }
+
+ // Mark no more splits for all stream nodes
+ for (const auto& streamId : streamIds_) {
+ task_->noMoreSplits(streamId);
+ }
+ allSplitsAdded = true;
}
void WholeStageResultIterator::collectMetrics() {
diff --git a/cpp/velox/compute/WholeStageResultIterator.h
b/cpp/velox/compute/WholeStageResultIterator.h
index 358b153c7a..401cff06da 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -18,7 +18,7 @@
#include "compute/Runtime.h"
#include "iceberg/IcebergPlanConverter.h"
-#include "memory/ColumnarBatchIterator.h"
+#include "memory/SplitAwareColumnarBatchIterator.h"
#include "memory/VeloxColumnarBatch.h"
#include "substrait/SubstraitToVeloxPlan.h"
#include "substrait/plan.pb.h"
@@ -33,7 +33,7 @@
namespace gluten {
-class WholeStageResultIterator : public ColumnarBatchIterator {
+class WholeStageResultIterator : public SplitAwareColumnarBatchIterator {
public:
WholeStageResultIterator(
VeloxMemoryManager* memoryManager,
@@ -69,14 +69,22 @@ class WholeStageResultIterator : public
ColumnarBatchIterator {
return metrics_.get();
}
- const facebook::velox::exec::Task* task() const {
- return task_.get();
- }
-
const facebook::velox::core::PlanNode* veloxPlan() const {
return veloxPlan_.get();
}
+ /// Get the underlying Velox task for direct manipulation
+ facebook::velox::exec::Task* task() {
+ return task_.get();
+ }
+
+ /// Add iterator-based splits from input iterators
+ void addIteratorSplits(const std::vector<std::shared_ptr<ResultIterator>>&
inputIterators) override;
+
+ /// Signal that no more splits will be added.
+ /// This is required for proper task completion and enables future barrier
support.
+ void noMoreSplits() override;
+
private:
/// Get the Spark confs to Velox query context.
std::unordered_map<std::string, std::string> getQueryContextConf();
@@ -94,9 +102,6 @@ class WholeStageResultIterator : public
ColumnarBatchIterator {
std::unordered_map<std::string, std::optional<std::string>>&,
const std::unordered_map<std::string, std::string>&);
- /// Add splits to task. Skip if already added.
- void tryAddSplitsToTask();
-
/// Collect Velox metrics.
void collectMetrics();
@@ -134,7 +139,7 @@ class WholeStageResultIterator : public
ColumnarBatchIterator {
std::vector<std::shared_ptr<SplitInfo>> scanInfos_;
std::vector<facebook::velox::core::PlanNodeId> streamIds_;
std::vector<std::vector<facebook::velox::exec::Split>> splits_;
- bool noMoreSplits_ = false;
+ bool allSplitsAdded = false;
int64_t loadLazyVectorTime_ = 0;
};
diff --git a/cpp/velox/cudf/CudfPlanValidator.cc
b/cpp/velox/cudf/CudfPlanValidator.cc
index b2d6e82c22..a5d350e579 100644
--- a/cpp/velox/cudf/CudfPlanValidator.cc
+++ b/cpp/velox/cudf/CudfPlanValidator.cc
@@ -46,8 +46,8 @@ bool CudfPlanValidator::validate(const ::substrait::Plan&
substraitPlan) {
std::vector<std::shared_ptr<ResultIterator>> inputs;
std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(
std::unordered_map<std::string, std::string>{{kCudfEnabled, "true"}});
- VeloxPlanConverter veloxPlanConverter(
- inputs, veloxMemoryPool.get(), veloxCfg.get(), std::nullopt,
std::nullopt, true);
+ VeloxPlanConverter veloxPlanConverter(veloxMemoryPool.get(), veloxCfg.get(),
+ inputs, std::nullopt, std::nullopt, true);
auto planNode = veloxPlanConverter.toVeloxPlan(substraitPlan, localFiles);
std::unordered_set<velox::core::PlanNodeId> emptySet;
velox::core::PlanFragment planFragment{planNode,
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
diff --git a/cpp/velox/jni/JniFileSystem.cc b/cpp/velox/jni/JniFileSystem.cc
index 51a7821f01..06d831a117 100644
--- a/cpp/velox/jni/JniFileSystem.cc
+++ b/cpp/velox/jni/JniFileSystem.cc
@@ -352,6 +352,7 @@ class JniFileSystem : public
facebook::velox::filesystems::FileSystem {
jstring element =
static_cast<jstring>(env->GetObjectArrayElement(jarray, i));
std::string cElement = jStringToCString(env, element);
out.push_back(cElement);
+ env->DeleteLocalRef(element);
}
return out;
}
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 74adb1dff5..ad6f8947eb 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -219,6 +219,7 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateExpressi
auto id = sFmap.function_anchor();
auto name = sFmap.name();
functionMappings.emplace(id, name);
+ env->DeleteLocalRef(mapping);
}
auto pool = defaultLeafVeloxMemoryPool().get();
@@ -475,6 +476,7 @@
Java_org_apache_gluten_utils_VeloxFileSystemValidationJniWrapper_allSupportedByR
if (!velox::filesystems::isPathSupportedByRegisteredFileSystems(path)) {
return false;
}
+ env->DeleteLocalRef(string);
}
return true;
JNI_METHOD_END(false)
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc
b/cpp/velox/operators/plannodes/CudfVectorStream.cc
similarity index 93%
copy from cpp/velox/operators/plannodes/RowVectorStream.cc
copy to cpp/velox/operators/plannodes/CudfVectorStream.cc
index 7121fe93ed..85cd2c021d 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.cc
+++ b/cpp/velox/operators/plannodes/CudfVectorStream.cc
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "RowVectorStream.h"
+#include "CudfVectorStream.h"
#include "memory/VeloxColumnarBatch.h"
#include "velox/exec/Driver.h"
#include "velox/exec/Operator.h"
@@ -45,7 +45,7 @@ class SuspendedSection {
} // namespace
namespace gluten {
-bool RowVectorStream::hasNext() {
+bool CudfVectorStreamBase::hasNext() {
if (finished_) {
return false;
}
@@ -70,7 +70,7 @@ bool RowVectorStream::hasNext() {
return hasNext;
}
-std::shared_ptr<ColumnarBatch> RowVectorStream::nextInternal() {
+std::shared_ptr<ColumnarBatch> CudfVectorStreamBase::nextInternal() {
if (finished_) {
return nullptr;
}
@@ -84,7 +84,7 @@ std::shared_ptr<ColumnarBatch>
RowVectorStream::nextInternal() {
return cb;
}
-facebook::velox::RowVectorPtr RowVectorStream::next() {
+facebook::velox::RowVectorPtr CudfVectorStreamBase::next() {
auto cb = nextInternal();
const std::shared_ptr<VeloxColumnarBatch>& vb =
VeloxColumnarBatch::from(pool_, cb);
auto vp = vb->getRowVector();
@@ -92,4 +92,4 @@ facebook::velox::RowVectorPtr RowVectorStream::next() {
return std::make_shared<facebook::velox::RowVector>(
vp->pool(), outputType_, facebook::velox::BufferPtr(0), vp->size(),
vp->children());
}
-} // namespace gluten
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/operators/plannodes/CudfVectorStream.h
b/cpp/velox/operators/plannodes/CudfVectorStream.h
index 9758d1c35e..7a663d252e 100644
--- a/cpp/velox/operators/plannodes/CudfVectorStream.h
+++ b/cpp/velox/operators/plannodes/CudfVectorStream.h
@@ -18,6 +18,11 @@
#pragma once
#include "CudfVectorStream.h"
+#include "compute/ResultIterator.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "velox/exec/Driver.h"
+#include "velox/exec/Operator.h"
+#include "velox/exec/Task.h"
#include "velox/experimental/cudf/exec/CudfOperator.h"
#include "velox/experimental/cudf/exec/Utilities.h"
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
@@ -25,14 +30,78 @@
namespace gluten {
-class CudfVectorStream : public RowVectorStream {
+class CudfVectorStreamBase {
+ public:
+ virtual ~CudfVectorStreamBase() = default;
+
+ explicit CudfVectorStreamBase(
+ facebook::velox::exec::DriverCtx* driverCtx,
+ facebook::velox::memory::MemoryPool* pool,
+ ResultIterator* iterator,
+ const facebook::velox::RowTypePtr& outputType)
+ : driverCtx_(driverCtx), pool_(pool), outputType_(outputType),
iterator_(iterator) {}
+
+ bool hasNext();
+
+ // Convert arrow batch to row vector, construct the new Rowvector with new
outputType.
+ virtual facebook::velox::RowVectorPtr next();
+
+ protected:
+ // Get the next batch from iterator_.
+ std::shared_ptr<ColumnarBatch> nextInternal();
+
+ facebook::velox::exec::DriverCtx* driverCtx_;
+ facebook::velox::memory::MemoryPool* pool_;
+ const facebook::velox::RowTypePtr outputType_;
+ ResultIterator* iterator_;
+
+ bool finished_{false};
+};
+
+class ValueStreamNode final : public facebook::velox::core::PlanNode {
+ public:
+ ValueStreamNode(
+ const facebook::velox::core::PlanNodeId& id,
+ const facebook::velox::RowTypePtr& outputType,
+ std::shared_ptr<ResultIterator> iterator)
+ : facebook::velox::core::PlanNode(id), outputType_(outputType),
iterator_(std::move(iterator)) {}
+
+ const facebook::velox::RowTypePtr& outputType() const override {
+ return outputType_;
+ }
+
+ const std::vector<facebook::velox::core::PlanNodePtr>& sources() const
override {
+ return kEmptySources_;
+ };
+
+ ResultIterator* iterator() const {
+ return iterator_.get();
+ }
+
+ std::string_view name() const override {
+ return "ValueStream";
+ }
+
+ folly::dynamic serialize() const override {
+ VELOX_UNSUPPORTED("ValueStream plan node is not serializable");
+ }
+
+ private:
+ void addDetails(std::stringstream& stream) const override{};
+
+ const facebook::velox::RowTypePtr outputType_;
+ std::shared_ptr<ResultIterator> iterator_;
+ const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
+};
+
+class CudfVectorStream : public CudfVectorStreamBase {
public:
CudfVectorStream(
facebook::velox::exec::DriverCtx* driverCtx,
facebook::velox::memory::MemoryPool* pool,
ResultIterator* iterator,
const facebook::velox::RowTypePtr& outputType)
- : RowVectorStream(driverCtx, pool, iterator, outputType) {}
+ : CudfVectorStreamBase(driverCtx, pool, iterator, outputType) {}
// Convert arrow batch to row vector and use new output columns
facebook::velox::RowVectorPtr next() override {
@@ -133,7 +202,7 @@ class CudfValueStream : public
facebook::velox::exec::SourceOperator, public fac
private:
bool finished_ = false;
- std::unique_ptr<RowVectorStream> rvStream_;
+ std::unique_ptr<CudfVectorStream> rvStream_;
};
class CudfVectorStreamOperatorTranslator : public
facebook::velox::exec::Operator::PlanNodeTranslator {
diff --git a/cpp/velox/operators/plannodes/IteratorSplit.h
b/cpp/velox/operators/plannodes/IteratorSplit.h
new file mode 100644
index 0000000000..6e42d10e6f
--- /dev/null
+++ b/cpp/velox/operators/plannodes/IteratorSplit.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "compute/ResultIterator.h"
+#include "velox/connectors/Connector.h"
+
+namespace gluten {
+
+/// Custom connector ID for iterator-based splits
+constexpr const char* kIteratorConnectorId = "value-stream";
+
+/// A custom split type that wraps a ResultIterator
+/// This allows iterators to be treated as splits and added dynamically to
tasks
+class IteratorConnectorSplit : public
facebook::velox::connector::ConnectorSplit {
+ public:
+ explicit IteratorConnectorSplit(const std::string& connectorId,
std::shared_ptr<ResultIterator> iterator)
+ : ConnectorSplit(connectorId), iterator_(std::move(iterator)) {}
+
+ std::shared_ptr<ResultIterator> iterator() const {
+ return iterator_;
+ }
+
+ std::string toString() const override {
+ return fmt::format("IteratorSplit[{}]", connectorId);
+ }
+
+ private:
+ std::shared_ptr<ResultIterator> iterator_;
+};
+
+} // namespace gluten
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc
b/cpp/velox/operators/plannodes/RowVectorStream.cc
index 7121fe93ed..7c0b00979a 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.cc
+++ b/cpp/velox/operators/plannodes/RowVectorStream.cc
@@ -20,6 +20,7 @@
#include "velox/exec/Driver.h"
#include "velox/exec/Operator.h"
#include "velox/exec/Task.h"
+#include "velox/vector/arrow/Bridge.h"
namespace {
@@ -45,6 +46,7 @@ class SuspendedSection {
} // namespace
namespace gluten {
+
bool RowVectorStream::hasNext() {
if (finished_) {
return false;
@@ -61,7 +63,13 @@ bool RowVectorStream::hasNext() {
// As of now, non-zero running threads usually happens when:
// 1. Task A spills task B;
// 2. Task A tries to grow buffers created by task B, during which spill
is requested on task A again.
- SuspendedSection ss(driverCtx_->driver);
+ const facebook::velox::exec::DriverThreadContext* driverThreadCtx =
+ facebook::velox::exec::driverThreadContext();
+ VELOX_CHECK_NOT_NULL(
+ driverThreadCtx,
+ "ExternalStreamDataSource::next() is not called "
+ "from a driver thread");
+ SuspendedSection ss(driverThreadCtx->driverCtx()->driver);
hasNext = iterator_->hasNext();
}
if (!hasNext) {
@@ -78,7 +86,13 @@ std::shared_ptr<ColumnarBatch>
RowVectorStream::nextInternal() {
{
// We are leaving Velox task execution and are probably entering Spark
code through JNI. Suspend the current
// driver to make the current task open to spilling.
- SuspendedSection ss(driverCtx_->driver);
+ const facebook::velox::exec::DriverThreadContext* driverThreadCtx =
+ facebook::velox::exec::driverThreadContext();
+ VELOX_CHECK_NOT_NULL(
+ driverThreadCtx,
+ "ExternalStreamDataSource::next() is not called "
+ "from a driver thread");
+ SuspendedSection ss(driverThreadCtx->driverCtx()->driver);
cb = iterator_->next();
}
return cb;
@@ -92,4 +106,67 @@ facebook::velox::RowVectorPtr RowVectorStream::next() {
return std::make_shared<facebook::velox::RowVector>(
vp->pool(), outputType_, facebook::velox::BufferPtr(0), vp->size(),
vp->children());
}
-} // namespace gluten
+
+ValueStreamDataSource::ValueStreamDataSource(
+ const facebook::velox::RowTypePtr& outputType,
+ const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle,
+ const facebook::velox::connector::ColumnHandleMap& columnHandles,
+ facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx)
+ : outputType_(outputType),
+ pool_(connectorQueryCtx->memoryPool()) {}
+
+void
ValueStreamDataSource::addSplit(std::shared_ptr<facebook::velox::connector::ConnectorSplit>
split) {
+ // Cast to IteratorConnectorSplit to extract the iterator
+ auto iteratorSplit = std::dynamic_pointer_cast<const
IteratorConnectorSplit>(split);
+ if (!iteratorSplit) {
+ throw std::runtime_error("Split is not an IteratorConnectorSplit");
+ }
+
+ auto iterator = iteratorSplit->iterator();
+ if (!iterator) {
+ throw std::runtime_error("IteratorConnectorSplit contains null iterator");
+ }
+
+ // Create RowVectorStream wrapper and add to pending queue
+ auto rowVectorStream = std::make_shared<RowVectorStream>(pool_, iterator,
outputType_);
+ pendingIterators_.push_back(rowVectorStream);
+}
+
+std::optional<facebook::velox::RowVectorPtr> ValueStreamDataSource::next(
+ uint64_t size,
+ facebook::velox::ContinueFuture& future) {
+ // Try to get current iterator if we don't have one
+ while (!currentIterator_) {
+ if (pendingIterators_.empty()) {
+ // No more iterators to process
+ return nullptr;
+ }
+
+ // Get next RowVectorStream from queue
+ currentIterator_ = pendingIterators_.front();
+ pendingIterators_.erase(pendingIterators_.begin());
+ }
+
+ // Check if current stream has more data
+ if (!currentIterator_->hasNext()) {
+ // Current stream exhausted, try next one
+ currentIterator_ = nullptr;
+ return next(size, future); // Recursively try next stream
+ }
+
+ // Get next batch from current stream (RowVectorStream handles conversion)
+ auto rowVector = currentIterator_->next();
+
+ if (!rowVector) {
+ currentIterator_ = nullptr;
+ return next(size, future); // Recursively try next stream
+ }
+
+ // Update metrics
+ completedRows_ += rowVector->size();
+ completedBytes_ += rowVector->estimateFlatSize();
+
+ return rowVector;
+}
+
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h
b/cpp/velox/operators/plannodes/RowVectorStream.h
index d6b8d37bcd..6e6ccd1527 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -19,6 +19,8 @@
#include "compute/ResultIterator.h"
#include "memory/VeloxColumnarBatch.h"
+#include "operators/plannodes/IteratorSplit.h"
+#include "velox/connectors/Connector.h"
#include "velox/exec/Driver.h"
#include "velox/exec/Operator.h"
#include "velox/exec/Task.h"
@@ -30,11 +32,10 @@ class RowVectorStream {
virtual ~RowVectorStream() = default;
explicit RowVectorStream(
- facebook::velox::exec::DriverCtx* driverCtx,
facebook::velox::memory::MemoryPool* pool,
- ResultIterator* iterator,
+ std::shared_ptr<ResultIterator> iterator,
const facebook::velox::RowTypePtr& outputType)
- : driverCtx_(driverCtx), pool_(pool), outputType_(outputType),
iterator_(iterator) {}
+ : pool_(pool), outputType_(outputType), iterator_(iterator) {}
bool hasNext();
@@ -45,100 +46,133 @@ class RowVectorStream {
// Get the next batch from iterator_.
std::shared_ptr<ColumnarBatch> nextInternal();
- facebook::velox::exec::DriverCtx* driverCtx_;
facebook::velox::memory::MemoryPool* pool_;
const facebook::velox::RowTypePtr outputType_;
- ResultIterator* iterator_;
+ std::shared_ptr<ResultIterator> iterator_;
bool finished_{false};
};
-class ValueStreamNode final : public facebook::velox::core::PlanNode {
+/// DataSource implementation that reads from ResultIterator instances.
+/// This allows iterator-based data to be consumed via Velox's standard
+/// connector/split mechanism, enabling proper integration with
Task::addSplit().
+class ValueStreamDataSource : public facebook::velox::connector::DataSource {
public:
- ValueStreamNode(
- const facebook::velox::core::PlanNodeId& id,
+ ValueStreamDataSource(
const facebook::velox::RowTypePtr& outputType,
- std::shared_ptr<ResultIterator> iterator)
- : facebook::velox::core::PlanNode(id), outputType_(outputType),
iterator_(std::move(iterator)) {}
+ const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle,
+ const facebook::velox::connector::ColumnHandleMap& columnHandles,
+ facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx);
- const facebook::velox::RowTypePtr& outputType() const override {
- return outputType_;
- }
+ void addSplit(std::shared_ptr<facebook::velox::connector::ConnectorSplit>
split) override;
- const std::vector<facebook::velox::core::PlanNodePtr>& sources() const
override {
- return kEmptySources_;
- };
+ std::optional<facebook::velox::RowVectorPtr> next(uint64_t size,
facebook::velox::ContinueFuture& future) override;
- ResultIterator* iterator() const {
- return iterator_.get();
+ void addDynamicFilter(
+ facebook::velox::column_index_t outputChannel,
+ const std::shared_ptr<facebook::velox::common::Filter>& filter) override
{
+ // Iterator-based sources don't support dynamic filtering
}
- std::string_view name() const override {
- return "ValueStream";
+ uint64_t getCompletedBytes() override {
+ return completedBytes_;
}
- folly::dynamic serialize() const override {
- VELOX_UNSUPPORTED("ValueStream plan node is not serializable");
+ uint64_t getCompletedRows() override {
+ return completedRows_;
}
- private:
- void addDetails(std::stringstream& stream) const override{};
+ std::unordered_map<std::string, facebook::velox::RuntimeMetric>
getRuntimeStats() override {
+ return {};
+ }
+ private:
const facebook::velox::RowTypePtr outputType_;
- std::shared_ptr<ResultIterator> iterator_;
- const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
+ facebook::velox::memory::MemoryPool* pool_;
+
+ std::vector<std::shared_ptr<RowVectorStream>> pendingIterators_;
+ std::shared_ptr<RowVectorStream> currentIterator_{nullptr};
+ uint64_t completedBytes_{0};
+ uint64_t completedRows_{0};
};
-class ValueStream : public facebook::velox::exec::SourceOperator {
+/// Table handle for iterator-based scans
+class ValueStreamTableHandle : public
facebook::velox::connector::ConnectorTableHandle {
public:
- ValueStream(
- int32_t operatorId,
- facebook::velox::exec::DriverCtx* driverCtx,
- std::shared_ptr<const ValueStreamNode> valueStreamNode)
- : facebook::velox::exec::SourceOperator(
- driverCtx,
- valueStreamNode->outputType(),
- operatorId,
- valueStreamNode->id(),
- valueStreamNode->name().data()) {
- ResultIterator* itr = valueStreamNode->iterator();
- rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr,
outputType_);
+ explicit ValueStreamTableHandle(std::string connectorId) :
ConnectorTableHandle(connectorId) {}
+
+ const std::string& name() const override {
+ static const std::string kName = "ValueStreamTableHandle";
+ return kName;
}
- facebook::velox::RowVectorPtr getOutput() override {
- if (finished_) {
- return nullptr;
- }
- if (rvStream_->hasNext()) {
- return rvStream_->next();
- } else {
- finished_ = true;
- return nullptr;
- }
+ folly::dynamic serialize() const override {
+ VELOX_NYI();
}
+};
+
+/// Column handle for iterator-based scans
+class ValueStreamColumnHandle : public
facebook::velox::connector::ColumnHandle {
+ public:
+ ValueStreamColumnHandle(std::string name, facebook::velox::TypePtr type)
+ : name_(std::move(name)), type_(std::move(type)) {}
- facebook::velox::exec::BlockingReason
isBlocked(facebook::velox::ContinueFuture* /* unused */) override {
- return facebook::velox::exec::BlockingReason::kNotBlocked;
+ const std::string& name() const {
+ return name_;
}
- bool isFinished() override {
- return finished_;
+ const facebook::velox::TypePtr& type() const {
+ return type_;
}
private:
- bool finished_ = false;
- std::unique_ptr<RowVectorStream> rvStream_;
+ std::string name_;
+ facebook::velox::TypePtr type_;
};
-class RowVectorStreamOperatorTranslator : public
facebook::velox::exec::Operator::PlanNodeTranslator {
- std::unique_ptr<facebook::velox::exec::Operator> toOperator(
- facebook::velox::exec::DriverCtx* ctx,
- int32_t id,
- const facebook::velox::core::PlanNodePtr& node) override {
- if (auto valueStreamNode = std::dynamic_pointer_cast<const
ValueStreamNode>(node)) {
- return std::make_unique<ValueStream>(id, ctx, valueStreamNode);
- }
- return nullptr;
+/// Connector implementation for iterator-based data sources
+class ValueStreamConnector : public facebook::velox::connector::Connector {
+ public:
+ explicit ValueStreamConnector(
+ const std::string& id,
+ std::shared_ptr<const facebook::velox::config::ConfigBase> config)
+ : Connector(id, config) {}
+
+ std::unique_ptr<facebook::velox::connector::DataSource> createDataSource(
+ const facebook::velox::RowTypePtr& outputType,
+ const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle,
+ const facebook::velox::connector::ColumnHandleMap& columnHandles,
+ facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx)
override {
+ return std::make_unique<ValueStreamDataSource>(outputType, tableHandle,
columnHandles, connectorQueryCtx);
+ }
+
+ std::unique_ptr<facebook::velox::connector::DataSink> createDataSink(
+ facebook::velox::RowTypePtr inputType,
+ facebook::velox::connector::ConnectorInsertTableHandlePtr
connectorInsertTableHandle,
+ facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx,
+ facebook::velox::connector::CommitStrategy commitStrategy) override {
+ VELOX_UNSUPPORTED("ValueStreamConnector does not support data sinks");
+ }
+};
+
+/// Factory for creating ValueStreamConnector instances
+class ValueStreamConnectorFactory : public
facebook::velox::connector::ConnectorFactory {
+ public:
+ static constexpr const char* kValueStreamConnectorName = "value-stream";
+
+ static std::string nodeIdOf(int32_t streamIdx) {
+ return fmt::format("{}:{}", kValueStreamConnectorName, streamIdx);
+ }
+
+ ValueStreamConnectorFactory() : ConnectorFactory(kValueStreamConnectorName)
{}
+
+ std::shared_ptr<facebook::velox::connector::Connector> newConnector(
+ const std::string& id,
+ std::shared_ptr<const facebook::velox::config::ConfigBase> config,
+ folly::Executor* ioExecutor = nullptr,
+ folly::Executor* cpuExecutor = nullptr) override {
+ return std::make_shared<ValueStreamConnector>(id, config);
}
};
+
} // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index b20b4a3d09..0085c553ba 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -899,15 +899,15 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
SubstraitParser::configSetInOptimization(generateRel.advanced_extension(),
"injectedProject=");
if (injectedProject) {
- // Child should be either ProjectNode or ValueStreamNode in case of
project fallback.
+ // Child should be either ProjectNode or CudfValueStreamNode (GPU) in case
of project fallback.
VELOX_CHECK(
(std::dynamic_pointer_cast<const core::ProjectNode>(childNode) !=
nullptr ||
- std::dynamic_pointer_cast<const ValueStreamNode>(childNode) !=
nullptr)
+ std::dynamic_pointer_cast<const core::TableScanNode>(childNode) !=
nullptr
#ifdef GLUTEN_ENABLE_GPU
|| std::dynamic_pointer_cast<const CudfValueStreamNode>(childNode)
!= nullptr
#endif
- && childNode->outputType()->size() >
requiredChildOutput.size(),
- "injectedProject is true, but the ProjectNode or ValueStreamNode (in
case of projection fallback)"
+ ) && childNode->outputType()->size() > requiredChildOutput.size(),
+ "injectedProject is true, but the ProjectNode or TableScanNode or
CudfValueStreamNode (in case of projection fallback)"
" is missing or does not have the corresponding projection field");
bool isStack = generateRel.has_advanced_extension() &&
@@ -1281,8 +1281,56 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
nextPlanNodeId(), sortingKeys, sortingOrders,
static_cast<int32_t>(topNRel.n()), false /*isPartial*/, childNode);
}
-template <typename T>
core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(
+ const ::substrait::ReadRel& readRel, int32_t streamIdx) {
+ // Use TableScanNode with iterator connector for runtime iterator inputs
+ // Get output schema from ReadRel
+ uint64_t colNum = 0;
+ std::vector<TypePtr> veloxTypeList;
+ if (readRel.has_base_schema()) {
+ const auto& baseSchema = readRel.base_schema();
+ colNum = baseSchema.names().size();
+ veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema);
+ }
+
+ auto nodeId = ValueStreamConnectorFactory::nodeIdOf(streamIdx);
+ std::vector<std::string> outNames;
+ outNames.reserve(colNum);
+ for (int idx = 0; idx < colNum; idx++) {
+ // TODO: We'd use the designated names in readRel rather than assigning
new names.
+ auto colName = fmt::format("node_{}_{}", nodeId, idx);
+ outNames.emplace_back(colName);
+ }
+ auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
+
+ // Create TableHandle
+ auto tableHandle =
std::make_shared<ValueStreamTableHandle>(kIteratorConnectorId);
+
+ // Create column assignments
+ connector::ColumnHandleMap assignments;
+ for (int idx = 0; idx < outputType->size(); idx++) {
+ auto name = outputType->nameOf(idx);
+ auto type = outputType->childAt(idx);
+ assignments[name] = std::make_shared<ValueStreamColumnHandle>(name, type);
+ }
+
+ // Create TableScanNode
+ auto tableScanNode = std::make_shared<core::TableScanNode>(
+ nodeId,
+ outputType,
+ tableHandle,
+ assignments);
+
+ // Mark this as a stream-based split
+ auto splitInfo = std::make_shared<SplitInfo>();
+ splitInfo->isStream = true;
+ splitInfoMap_[tableScanNode->id()] = splitInfo;
+
+ return tableScanNode;
+}
+
+#ifdef GLUTEN_ENABLE_GPU
+core::PlanNodePtr SubstraitToVeloxPlanConverter::constructCudfValueStreamNode(
const ::substrait::ReadRel& readRel,
int32_t streamIdx) {
// Get the input schema of this iterator.
@@ -1307,28 +1355,31 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructValueStreamNode(
std::shared_ptr<ResultIterator> iterator;
if (!validationMode_) {
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index
{} in input iterator list.", streamIdx);
- iterator = inputIters_[streamIdx];
+ iterator = std::move(inputIters_[streamIdx]);
}
- auto node = std::make_shared<T>(nextPlanNodeId(), outputType,
std::move(iterator));
+ auto node = std::make_shared<CudfValueStreamNode>(nextPlanNodeId(),
outputType, std::move(iterator));
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
splitInfoMap_[node->id()] = splitInfo;
return node;
}
+#endif
core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode(
const ::substrait::ReadRel& readRel,
int32_t streamIdx) {
- std::vector<RowVectorPtr> values;
+ // ValuesNode is only used for validation/benchmarking with query trace
+ // It loads all data from the iterator at plan construction time
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index
{} in input iterator list.", streamIdx);
- const auto iterator = inputIters_[streamIdx];
- while (iterator->hasNext()) {
- auto cb = VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(),
iterator->next());
- values.emplace_back(cb->getRowVector());
- }
- auto node =
std::make_shared<facebook::velox::core::ValuesNode>(nextPlanNodeId(),
std::move(values));
-
+ const auto iter = std::move(inputIters_[streamIdx]);
+ std::vector<RowVectorPtr> rowVectors;
+ while (iter->hasNext()) {
+ auto batch = iter->next();
+ auto veloxBatch =
VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(), batch);
+ rowVectors.emplace_back(veloxBatch->getRowVector());
+ }
+ auto node =
std::make_shared<facebook::velox::core::ValuesNode>(nextPlanNodeId(),
std::move(rowVectors));
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
splitInfoMap_[node->id()] = splitInfo;
@@ -1343,19 +1394,20 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
!readRel.common().has_emit(), "Emit not supported for ValuesNode and
TableScanNode related Substrait plans.");
}
- // Check if the ReadRel specifies an input of stream. If yes, build
ValueStreamNode as the data source.
auto streamIdx = getStreamIndex(readRel);
if (streamIdx >= 0) {
+ // Check if the ReadRel specifies an input of stream. If yes, build
TableScanNode with iterator connector.
+ const bool isQueryTraceEnabled = veloxCfg_->get<bool>(kQueryTraceEnabled,
false);
+ if (isQueryTraceEnabled) {
+ // Only used in benchmark enable query trace, replace ValueStreamNode to
ValuesNode to support serialization.
+ return constructValuesNode(readRel, streamIdx);
+ }
#ifdef GLUTEN_ENABLE_GPU
if (veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
- return constructValueStreamNode<CudfValueStreamNode>(readRel, streamIdx);
+ return constructCudfValueStreamNode(readRel, streamIdx);
}
#endif
- if (!veloxCfg_->get<bool>(kQueryTraceEnabled, false)) {
- return constructValueStreamNode<ValueStreamNode>(readRel, streamIdx);
- }
- // Only used in benchmark enable query trace, replace ValueStreamNode to
ValuesNode to support serialization.
- return constructValuesNode(readRel, streamIdx);
+ return constructValueStreamNode(readRel, streamIdx);
}
// Otherwise, will create TableScan node for ReadRel.
@@ -1651,14 +1703,4 @@ bool
SubstraitToVeloxPlanConverter::checkTypeExtension(const ::substrait::Plan&
return true;
}
-#ifdef GLUTEN_ENABLE_GPU
-template core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructValueStreamNode<CudfValueStreamNode>(
- const ::substrait::ReadRel& sRead,
- int32_t streamIdx);
-#endif
-
-template core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructValueStreamNode<ValueStreamNode>(
- const ::substrait::ReadRel& sRead,
- int32_t streamIdx);
-
} // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 48e5709ea8..0e00764a66 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -70,11 +70,13 @@ class SubstraitToVeloxPlanConverter {
explicit SubstraitToVeloxPlanConverter(
memory::MemoryPool* pool,
const facebook::velox::config::ConfigBase* veloxCfg,
+ const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
const std::optional<std::string> writeFilesTempPath = std::nullopt,
const std::optional<std::string> writeFileName = std::nullopt,
bool validationMode = false)
: pool_(pool),
veloxCfg_(veloxCfg),
+ inputIters_(inputIters),
writeFilesTempPath_(writeFilesTempPath),
writeFileName_(writeFileName),
validationMode_(validationMode) {
@@ -133,9 +135,12 @@ class SubstraitToVeloxPlanConverter {
/// FileProperties: the file sizes and modification times of the files to be
scanned.
core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead);
- template <typename T>
+ // Construct a table scan node accepting value streams as input.
core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel&
sRead, int32_t streamIdx);
+ // Construct a cuDF value stream node.
+ core::PlanNodePtr constructCudfValueStreamNode(const ::substrait::ReadRel&
sRead, int32_t streamIdx);
+
// This is only used in benchmark and enable query trace, which will load
all the data to ValuesNode.
core::PlanNodePtr constructValuesNode(const ::substrait::ReadRel& sRead,
int32_t streamIdx);
@@ -181,8 +186,10 @@ class SubstraitToVeloxPlanConverter {
splitInfos_ = splitInfos;
}
- void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
- inputIters_ = std::move(inputIters);
+ /// The input iterators not inlined to VeloxPlan. They should be then
manually added to the Velox task
+ /// via WholeStageResultIterator#addIteratorSplits. Empty if no input
iterators remaining.
+ const std::vector<std::shared_ptr<ResultIterator>>&
remainingInputIterators() const {
+ return inputIters_;
}
/// Used to check if ReadRel specifies an input of stream.
@@ -271,8 +278,6 @@ class SubstraitToVeloxPlanConverter {
/// The map storing the split stats for each PlanNode.
std::unordered_map<core::PlanNodeId, std::shared_ptr<SplitInfo>>
splitInfoMap_;
- std::vector<std::shared_ptr<ResultIterator>> inputIters_;
-
/// The map storing the pre-built plan nodes which can be accessed through
/// index. This map is only used when the computation of a Substrait plan
/// depends on other input nodes.
@@ -291,6 +296,9 @@ class SubstraitToVeloxPlanConverter {
/// A map of custom configs.
const facebook::velox::config::ConfigBase* veloxCfg_;
+ /// Input row-vectors for query trace mode (ValuesNode / cuDF ValueStream
support)
+ std::vector<std::shared_ptr<ResultIterator>> inputIters_;
+
/// The temporary path used to write files.
std::optional<std::string> writeFilesTempPath_;
std::optional<std::string> writeFileName_;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
index 9005604f81..8afc7c5bf8 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
@@ -33,8 +33,8 @@ class SubstraitToVeloxPlanValidator {
std::unordered_map<std::string, std::string> configs{
{velox::core::QueryConfig::kSparkPartitionId, "0"},
{velox::core::QueryConfig::kSessionTimezone, "GMT"}};
veloxCfg_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
- planConverter_ =
- std::make_unique<SubstraitToVeloxPlanConverter>(pool, veloxCfg_.get(),
std::nullopt, std::nullopt, true);
+ planConverter_ = std::make_unique<SubstraitToVeloxPlanConverter>(
+ pool, veloxCfg_.get(), std::vector<std::shared_ptr<ResultIterator>>{},
std::nullopt, std::nullopt, true);
queryCtx_ = velox::core::QueryCtx::create(nullptr,
velox::core::QueryConfig(veloxCfg_->rawConfigs()));
// An execution context used for function validation.
execCtx_ = std::make_unique<velox::core::ExecCtx>(pool, queryCtx_.get());
diff --git a/cpp/velox/tests/FunctionTest.cc b/cpp/velox/tests/FunctionTest.cc
index fc7892003f..17645aa1a9 100644
--- a/cpp/velox/tests/FunctionTest.cc
+++ b/cpp/velox/tests/FunctionTest.cc
@@ -44,7 +44,7 @@ class FunctionTest : public ::testing::Test, public
test::VectorTestBase {
std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
std::shared_ptr<gluten::SubstraitToVeloxPlanConverter> planConverter_ =
- std::make_shared<gluten::SubstraitToVeloxPlanConverter>(pool(),
veloxCfg_.get());
+ std::make_shared<gluten::SubstraitToVeloxPlanConverter>(pool(),
veloxCfg_.get(), std::vector<std::shared_ptr<ResultIterator>>());
};
TEST_F(FunctionTest, makeNames) {
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 8847fed685..5487e7fd2e 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -65,6 +65,11 @@ class DummyRuntime final : public Runtime {
auto iter = std::make_shared<ResultIterator>(std::move(resIter));
return iter;
}
+
+ void noMoreSplits(ResultIterator* iter) override {
+ // Do nothing.
+ }
+
MemoryManager* memoryManager() override {
throw GlutenException("Not yet implemented");
}
@@ -150,6 +155,7 @@ TEST(TestRuntime, GetResultIterator) {
DummyMemoryManager mm(kDummyBackendKind);
auto runtime = std::make_shared<DummyRuntime>(kDummyBackendKind, &mm,
std::unordered_map<std::string, std::string>());
auto iter = runtime->createResultIterator("/tmp/test-spill", {});
+ runtime->noMoreSplits(iter.get());
ASSERT_TRUE(iter->hasNext());
auto next = iter->next();
ASSERT_NE(next, nullptr);
diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
index f1f6ef3865..8222f74caa 100644
--- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
@@ -72,7 +72,7 @@ class Substrait2VeloxPlanConversionTest : public
exec::test::HiveConnectorTestBa
std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
std::shared_ptr<VeloxPlanConverter> planConverter_ =
-
std::make_shared<VeloxPlanConverter>(std::vector<std::shared_ptr<ResultIterator>>(),
pool(), veloxCfg_.get());
+ std::make_shared<VeloxPlanConverter>(pool(), veloxCfg_.get(),
std::vector<std::shared_ptr<ResultIterator>>());
};
// This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's
diff --git a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
index 0a2b409526..d041bc359a 100644
--- a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
@@ -43,7 +43,7 @@ TEST_F(Substrait2VeloxValuesNodeConversionTest, valuesNode) {
JsonToProtoConverter::readFromFile(planPath, substraitPlan);
auto veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
std::shared_ptr<SubstraitToVeloxPlanConverter> planConverter_ =
- std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(),
veloxCfg.get(), std::nullopt, std::nullopt, true);
+ std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(),
veloxCfg.get(), std::vector<std::shared_ptr<ResultIterator>>(), std::nullopt,
std::nullopt, true);
auto veloxPlan = planConverter_->toVeloxPlan(substraitPlan);
RowVectorPtr expectedData = makeRowVector(
diff --git a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
index 7b8e8336ea..ae150bb98a 100644
--- a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
+++ b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
@@ -72,7 +72,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase {
auto veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
std::shared_ptr<SubstraitToVeloxPlanConverter> substraitConverter_ =
- std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(),
veloxCfg.get(), std::nullopt, std::nullopt, true);
+ std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(),
veloxCfg.get(), std::vector<std::shared_ptr<ResultIterator>>(), std::nullopt,
std::nullopt, true);
// Convert Substrait Plan to the same Velox Plan.
auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan);
@@ -94,7 +94,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase {
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
std::shared_ptr<SubstraitToVeloxPlanConverter> substraitConverter_ =
std::make_shared<SubstraitToVeloxPlanConverter>(
- pool_.get(), veloxCfg.get(), std::nullopt, std::nullopt, true);
+ pool_.get(), veloxCfg.get(),
std::vector<std::shared_ptr<ResultIterator>>(), std::nullopt, std::nullopt,
true);
// Convert Substrait Plan to the same Velox Plan.
auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan);
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
index f4d2c8e7d1..ad200dd46c 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
@@ -53,6 +53,11 @@ public class ColumnarBatchOutIterator extends
ClosableIterator<ColumnarBatch>
private native void nativeClose(long iterHandle);
+ private native boolean nativeAddIteratorSplits(
+ long iterHandle, ColumnarBatchInIterator[] batchItr);
+
+ private native void nativeNoMoreSplits(long iterHandle);
+
@Override
public boolean hasNext0() throws IOException {
return nativeHasNext(iterHandle);
@@ -75,6 +80,34 @@ public class ColumnarBatchOutIterator extends
ClosableIterator<ColumnarBatch>
}
}
+ /**
+ * Add new iterator splits to the iterator as new inputs for processing.
Note: File-based splits
+ * are not supported.
+ *
+ * @param batchItr Array of iterators to add as splits
+ * @return true if splits were added successfully, false otherwise
+ * @throws IllegalStateException if the iterator is closed
+ */
+ public boolean addIteratorSplits(ColumnarBatchInIterator[] batchItr) {
+ if (closed.get()) {
+ throw new IllegalStateException("Cannot add splits to a closed
iterator");
+ }
+ return nativeAddIteratorSplits(iterHandle, batchItr);
+ }
+
+ /**
+ * Signal that no more splits will be added to the iterator. This is
required for proper task
+ * completion and is a prerequisite for barrier support.
+ *
+ * @throws IllegalStateException if the iterator is closed
+ */
+ public void noMoreSplits() {
+ if (closed.get()) {
+ throw new IllegalStateException("Cannot call noMoreSplits on a closed
iterator");
+ }
+ nativeNoMoreSplits(iterHandle);
+ }
+
@Override
public void close0() {
// To make sure the outputted batches are still accessible after the
iterator is closed.
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index 813578566b..6d2c90896b 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -71,10 +70,11 @@ public class NativePlanEvaluator {
// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
+ // Supports both creation-time splits (splitInfo, iterList) and runtime
splits (via addSplits()).
public ColumnarBatchOutIterator createKernelWithBatchIterator(
byte[] wsPlan,
byte[][] splitInfo,
- List<ColumnarBatchInIterator> iterList,
+ ColumnarBatchInIterator[] iterList,
int partitionIndex,
String spillDirPath)
throws RuntimeException {
@@ -82,7 +82,7 @@ public class NativePlanEvaluator {
jniWrapper.nativeCreateKernelWithIterator(
wsPlan,
splitInfo,
- iterList.toArray(new ColumnarBatchInIterator[0]),
+ iterList,
TaskContext.get().stageId(),
partitionIndex, // TaskContext.getPartitionId(),
TaskContext.get().taskAttemptId(),
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index b8de4d63b5..a808290679 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -62,8 +62,11 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware
{
public native String nativePlanString(byte[] substraitPlan, Boolean details);
/**
- * Create a native compute kernel and return a columnar result iterator.
+ * Create a native compute kernel and return a columnar result iterator.
Supports both
+ * creation-time splits (splitInfo, batchItr) and runtime splits (via
addSplits).
*
+ * @param splitInfo optional file-based splits to add at creation time (can
be null)
+ * @param batchItr optional iterator-based splits to add at creation time
(can be null)
* @return iterator instance id
*/
public native long nativeCreateKernelWithIterator(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]