This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 420b7046ad [GLUTEN-11514][VL] Refactor plan execution by adding 
`addIteratorSplits` and `noMoreSplits` methods to the plan execution API 
(#11527)
420b7046ad is described below

commit 420b7046ade81d69bf7d7d88a96076139db8b303
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Feb 4 13:38:23 2026 +0000

    [GLUTEN-11514][VL] Refactor plan execution by adding `addIteratorSplits` 
and `noMoreSplits` methods to the plan execution API (#11527)
---
 .../backendsapi/velox/VeloxIteratorApi.scala       |  11 +-
 .../gluten/execution/MiscOperatorSuite.scala       |   2 +-
 cpp/core/compute/Runtime.h                         |   5 +
 cpp/core/jni/JniCommon.h                           |   3 +-
 cpp/core/jni/JniWrapper.cc                         |  93 ++++++++++--
 cpp/core/memory/SplitAwareColumnarBatchIterator.h  |  49 +++++++
 cpp/velox/CMakeLists.txt                           |   1 +
 cpp/velox/benchmarks/GenericBenchmark.cc           |   1 +
 cpp/velox/compute/VeloxBackend.cc                  |   6 +-
 cpp/velox/compute/VeloxPlanConverter.cc            |   9 +-
 cpp/velox/compute/VeloxPlanConverter.h             |  14 +-
 cpp/velox/compute/VeloxRuntime.cc                  |  30 +++-
 cpp/velox/compute/VeloxRuntime.h                   |   2 +
 cpp/velox/compute/WholeStageResultIterator.cc      |  35 ++++-
 cpp/velox/compute/WholeStageResultIterator.h       |  25 ++--
 cpp/velox/cudf/CudfPlanValidator.cc                |   4 +-
 cpp/velox/jni/JniFileSystem.cc                     |   1 +
 cpp/velox/jni/VeloxJniWrapper.cc                   |   2 +
 .../{RowVectorStream.cc => CudfVectorStream.cc}    |  10 +-
 cpp/velox/operators/plannodes/CudfVectorStream.h   |  75 +++++++++-
 cpp/velox/operators/plannodes/IteratorSplit.h      |  47 ++++++
 cpp/velox/operators/plannodes/RowVectorStream.cc   |  83 ++++++++++-
 cpp/velox/operators/plannodes/RowVectorStream.h    | 162 +++++++++++++--------
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        | 106 ++++++++++----
 cpp/velox/substrait/SubstraitToVeloxPlan.h         |  18 ++-
 .../substrait/SubstraitToVeloxPlanValidator.h      |   4 +-
 cpp/velox/tests/FunctionTest.cc                    |   2 +-
 cpp/velox/tests/RuntimeTest.cc                     |   6 +
 .../tests/Substrait2VeloxPlanConversionTest.cc     |   2 +-
 .../Substrait2VeloxValuesNodeConversionTest.cc     |   2 +-
 cpp/velox/tests/VeloxSubstraitRoundTripTest.cc     |   4 +-
 .../vectorized/ColumnarBatchOutIterator.java       |  33 +++++
 .../gluten/vectorized/NativePlanEvaluator.java     |   6 +-
 .../gluten/vectorized/PlanEvaluatorJniWrapper.java |   5 +-
 34 files changed, 681 insertions(+), 177 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index c6ad7d5029..668e60b205 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -213,11 +213,12 @@ class VeloxIteratorApi extends IteratorApi with Logging {
     val resIter: ColumnarBatchOutIterator =
       transKernel.createKernelWithBatchIterator(
         inputPartition.plan,
-        splitInfoByteArray,
-        columnarNativeIterators.asJava,
+        if (splitInfoByteArray.nonEmpty) splitInfoByteArray else null,
+        if (columnarNativeIterators.nonEmpty) columnarNativeIterators.toArray 
else null,
         partitionIndex,
         
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
       )
+    resIter.noMoreSplits()
     val itrMetrics = IteratorMetricsJniWrapper.create()
 
     Iterators
@@ -261,12 +262,12 @@ class VeloxIteratorApi extends IteratorApi with Logging {
     val nativeResultIterator =
       transKernel.createKernelWithBatchIterator(
         rootNode.toProtobuf.toByteArray,
-        // Final iterator does not contain scan split, so pass empty split 
info to native here.
-        new Array[Array[Byte]](0),
-        columnarNativeIterator.asJava,
+        null,
+        if (columnarNativeIterator.nonEmpty) columnarNativeIterator.toArray 
else null,
         partitionIndex,
         
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
       )
+    nativeResultIterator.noMoreSplits()
     val itrMetrics = IteratorMetricsJniWrapper.create()
 
     Iterators
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 0ea6bde3d8..e1a0fd98ee 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -869,7 +869,7 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
         }
         assert(wholeStageTransformers.size == 3)
         val nativePlanString = wholeStageTransformers.head.nativePlanString()
-        assert(nativePlanString.contains("Aggregation[1][SINGLE"))
+        
assert(nativePlanString.matches("[\\s\\S]*Aggregation\\[\\d+]\\[SINGLE[\\s\\S]*"))
         assert(nativePlanString.contains("ValueStream"))
         
assert(wholeStageTransformers(1).nativePlanString().contains("ValueStream"))
         
assert(wholeStageTransformers.last.nativePlanString().contains("TableScan"))
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index c1f82a0c34..a15cdc83d3 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -23,6 +23,7 @@
 #include "compute/ResultIterator.h"
 #include "memory/ColumnarBatch.h"
 #include "memory/MemoryManager.h"
+#include "memory/SplitAwareColumnarBatchIterator.h"
 #include "operators/c2r/ColumnarToRow.h"
 #include "operators/r2c/RowToColumnar.h"
 #include "operators/serializer/ColumnarBatchSerializer.h"
@@ -105,6 +106,10 @@ class Runtime : public 
std::enable_shared_from_this<Runtime> {
     throw GlutenException("Not implemented");
   }
 
+  virtual void noMoreSplits(ResultIterator* iter) {
+    throw GlutenException("Not implemented");
+  }
+
   virtual std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t 
numRows) {
     throw GlutenException("Not implemented");
   }
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index cb400e2db3..1044edb6d7 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -233,6 +233,7 @@ class SafeNativeArray {
  public:
   virtual ~SafeNativeArray() {
     PrimitiveArray::release(env_, javaArray_, nativeArray_);
+    env_->DeleteLocalRef(javaArray_);
   }
 
   SafeNativeArray(const SafeNativeArray&) = delete;
@@ -255,7 +256,7 @@ class SafeNativeArray {
 
  private:
   SafeNativeArray(JNIEnv* env, JavaArrayType javaArray, JniNativeArrayType 
nativeArray)
-      : env_(env), javaArray_(javaArray), nativeArray_(nativeArray){};
+      : env_(env), 
javaArray_(static_cast<JavaArrayType>(env_->NewLocalRef(javaArray))), 
nativeArray_(nativeArray){};
 
   JNIEnv* env_;
   JavaArrayType javaArray_;
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index adada15f91..e9d797e1db 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -29,6 +29,7 @@
 #include <optional>
 #include <string>
 #include "memory/AllocationListener.h"
+#include "memory/SplitAwareColumnarBatchIterator.h"
 #include "operators/serializer/ColumnarBatchSerializer.h"
 #include "shuffle/LocalPartitionWriter.h"
 #include "shuffle/Partitioning.h"
@@ -455,7 +456,7 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
     jobject wrapper,
     jbyteArray planArr,
     jobjectArray splitInfosArr,
-    jobjectArray iterArr,
+    jobjectArray batchItrArray,
     jint stageId,
     jint partitionId,
     jlong taskId,
@@ -478,24 +479,28 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
 
   ctx->parsePlan(safePlanArray.elems(), planSize);
 
-  for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); i 
< splitInfoArraySize; i++) {
-    jbyteArray splitInfoArray = 
static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
-    jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
-    auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray);
-    auto splitInfoData = safeSplitArray.elems();
+  if (splitInfosArr != nullptr) {
+    for (jsize i = 0, splitInfoArraySize = env->GetArrayLength(splitInfosArr); 
i < splitInfoArraySize; i++) {
+      jbyteArray splitInfoArray = 
static_cast<jbyteArray>(env->GetObjectArrayElement(splitInfosArr, i));
+      jsize splitInfoSize = env->GetArrayLength(splitInfoArray);
+      auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray);
+      auto splitInfoData = safeSplitArray.elems();
 
-    ctx->parseSplitInfo(splitInfoData, splitInfoSize, i);
+      ctx->parseSplitInfo(splitInfoData, splitInfoSize, i);
+    }
   }
 
   // Handle the Java iters
-  jsize itersLen = env->GetArrayLength(iterArr);
   std::vector<std::shared_ptr<ResultIterator>> inputIters;
-  inputIters.reserve(itersLen);
-  for (int idx = 0; idx < itersLen; idx++) {
-    jobject iter = env->GetObjectArrayElement(iterArr, idx);
-    auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter, 
ctx, idx);
-    auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
-    inputIters.push_back(std::move(resultIter));
+  if (batchItrArray != nullptr) {
+    jsize itersLen = env->GetArrayLength(batchItrArray);
+    inputIters.reserve(itersLen);
+    for (int idx = 0; idx < itersLen; idx++) {
+      jobject iter = env->GetObjectArrayElement(batchItrArray, idx);
+      auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter, 
ctx, idx);
+      auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
+      inputIters.push_back(std::move(resultIter));
+    }
   }
 
   return ctx->saveObject(ctx->createResultIterator(spillDirStr, inputIters));
@@ -630,6 +635,66 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterato
   JNI_METHOD_END()
 }
 
+JNIEXPORT jboolean JNICALL 
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeAddIteratorSplits(
 // NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong iterHandle,
+    jobjectArray batchItrArray) {
+  JNI_METHOD_START
+  auto ctx = getRuntime(env, wrapper);
+  auto outIter = ObjectStore::retrieve<ResultIterator>(iterHandle);
+  if (outIter == nullptr) {
+    throw GlutenException("Invalid iterator handle for addSplits");
+  }
+  
+  // Get the underlying split-aware iterator
+  auto* splitAwareIter = 
dynamic_cast<gluten::SplitAwareColumnarBatchIterator*>(outIter->getInputIter());
+  if (splitAwareIter == nullptr) {
+    throw GlutenException("Iterator does not support split management");
+  }
+
+  GLUTEN_CHECK(batchItrArray != nullptr, "FATAL: Splits to add cannot be 
null");
+
+  // Convert Java ColumnarBatchInIterator[] to native iterators and add as 
splits
+  jsize numIterators = env->GetArrayLength(batchItrArray);
+  std::vector<std::shared_ptr<ResultIterator>> inputIterators;
+  inputIterators.reserve(numIterators);
+
+  for (jsize idx = 0; idx < numIterators; idx++) {
+    jobject iter = env->GetObjectArrayElement(batchItrArray, idx);
+    if (iter == nullptr) {
+      inputIterators.push_back(nullptr);
+    } else {
+      auto arrayIter = std::make_unique<JniColumnarBatchIterator>(env, iter, 
ctx, idx);
+      auto resultIter = std::make_shared<ResultIterator>(std::move(arrayIter));
+      inputIterators.push_back(std::move(resultIter));
+    }
+    env->DeleteLocalRef(iter);
+  }
+
+  // Add iterator splits via interface method
+  if (!inputIterators.empty()) {
+    splitAwareIter->addIteratorSplits(inputIterators);
+  }
+
+  return true;
+  JNI_METHOD_END(false)
+}
+
+JNIEXPORT void JNICALL 
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeNoMoreSplits( 
// NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong iterHandle) {
+  JNI_METHOD_START
+  auto ctx = getRuntime(env, wrapper);
+  auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
+  if (iter == nullptr) {
+    throw GlutenException("Invalid iterator handle for noMoreSplits");
+  }
+  ctx->noMoreSplits(iter.get());
+  JNI_METHOD_END()
+}
+
 JNIEXPORT jlong JNICALL
 
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit(
 // NOLINT
     JNIEnv* env,
diff --git a/cpp/core/memory/SplitAwareColumnarBatchIterator.h 
b/cpp/core/memory/SplitAwareColumnarBatchIterator.h
new file mode 100644
index 0000000000..e12f38afad
--- /dev/null
+++ b/cpp/core/memory/SplitAwareColumnarBatchIterator.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+#include "ColumnarBatchIterator.h"
+
+// Forward declarations
+namespace substrait {
+class ReadRel_LocalFiles;
+}
+
+namespace gluten {
+
+// Forward declaration
+class ResultIterator;
+
+/// Abstract base class for iterators that support dynamic split management.
+/// Provides APIs for adding splits after iterator creation and signaling 
completion.
+class SplitAwareColumnarBatchIterator : public ColumnarBatchIterator {
+ public:
+  SplitAwareColumnarBatchIterator() = default;
+  virtual ~SplitAwareColumnarBatchIterator() = default;
+
+  /// Add iterator-based splits from input iterators.
+  virtual void addIteratorSplits(const 
std::vector<std::shared_ptr<ResultIterator>>& inputIterators) = 0;
+
+  /// Signal that no more splits will be added to this iterator.
+  /// This must be called after all splits have been added to ensure proper 
task completion.
+  virtual void noMoreSplits() = 0;
+};
+
+} // namespace gluten
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 56fab701ee..eb0e11e04b 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -205,6 +205,7 @@ if(ENABLE_GPU)
     VELOX_SRCS
     cudf/CudfPlanValidator.cc
     cudf/GpuLock.cc
+    operators/plannodes/CudfVectorStream.cc
     shuffle/VeloxGpuShuffleReader.cc
     shuffle/VeloxGpuShuffleWriter.cc
     operators/serializer/VeloxGpuColumnarBatchSerializer.cc
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 2121254137..616ba9bcfb 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -452,6 +452,7 @@ auto BM_Generic = [](::benchmark::State& state,
       }
 
       auto resultIter = runtime->createResultIterator(veloxSpillDir, 
std::move(inputIters));
+      runtime->noMoreSplits(resultIter.get());
       listenerPtr->setIterator(resultIter.get());
 
       if (FLAGS_with_shuffle) {
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index 75fd84b104..45a64908e1 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -47,6 +47,7 @@
 #include "velox/connectors/hive/BufferedInputBuilder.h"
 #include "velox/connectors/hive/HiveConnector.h"
 #include "velox/connectors/hive/HiveDataSource.h"
+#include "operators/plannodes/RowVectorStream.h"
 #include 
"velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // 
@manual
 #include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" 
// @manual
 #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
@@ -204,7 +205,6 @@ void VeloxBackend::init(
     // RSS shuffle serde.
     
facebook::velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde();
   }
-  
velox::exec::Operator::registerOperator(std::make_unique<RowVectorStreamOperatorTranslator>());
 
   initUdf();
 
@@ -317,6 +317,10 @@ void VeloxBackend::initConnector(const 
std::shared_ptr<velox::config::ConfigBase
   }
   velox::connector::registerConnector(
       
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, 
hiveConf, ioExecutor_.get()));
+  
+  // Register value-stream connector for runtime iterator-based inputs
+  
velox::connector::registerConnector(std::make_shared<ValueStreamConnector>(kIteratorConnectorId,
 hiveConf));
+  
 #ifdef GLUTEN_ENABLE_GPU
   if (backendConf_->get<bool>(kCudfEnableTableScan, 
kCudfEnableTableScanDefault) &&
       backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc 
b/cpp/velox/compute/VeloxPlanConverter.cc
index 05e78eb1ba..4764fad382 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -18,27 +18,25 @@
 #include "VeloxPlanConverter.h"
 #include <filesystem>
 
-#include "compute/ResultIterator.h"
 #include "config/GlutenConfig.h"
 #include "iceberg/IcebergPlanConverter.h"
-#include "velox/common/file/FileSystems.h"
+#include "operators/plannodes/IteratorSplit.h"
 
 namespace gluten {
 
 using namespace facebook;
 
 VeloxPlanConverter::VeloxPlanConverter(
-    const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
     velox::memory::MemoryPool* veloxPool,
     const facebook::velox::config::ConfigBase* veloxCfg,
+    const std::vector<std::shared_ptr<ResultIterator>>& rowVectors,
     const std::optional<std::string> writeFilesTempPath,
     const std::optional<std::string> writeFileName,
     bool validationMode)
     : validationMode_(validationMode),
       veloxCfg_(veloxCfg),
-      substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath, 
writeFileName, validationMode) {
+      substraitVeloxPlanConverter_(veloxPool, veloxCfg, rowVectors, 
writeFilesTempPath, writeFileName, validationMode) {
   VELOX_USER_CHECK_NOT_NULL(veloxCfg_);
-  substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
 }
 
 namespace {
@@ -136,7 +134,6 @@ void parseLocalFileNodes(
   splitInfos.reserve(localFiles.size());
   for (const auto& localFile : localFiles) {
     const auto& fileList = localFile.items();
-
     splitInfos.push_back(parseScanSplitInfo(veloxCfg, fileList));
   }
 
diff --git a/cpp/velox/compute/VeloxPlanConverter.h 
b/cpp/velox/compute/VeloxPlanConverter.h
index 4678dccea7..0b597a91f9 100644
--- a/cpp/velox/compute/VeloxPlanConverter.h
+++ b/cpp/velox/compute/VeloxPlanConverter.h
@@ -18,11 +18,11 @@
 #pragma once
 
 #include <velox/common/memory/MemoryPool.h>
-#include "compute/ResultIterator.h"
-#include "memory/VeloxMemoryManager.h"
+#include <velox/core/PlanNode.h>
+#include <velox/exec/Split.h>
+
 #include "substrait/SubstraitToVeloxPlan.h"
 #include "substrait/plan.pb.h"
-#include "velox/core/PlanNode.h"
 
 namespace gluten {
 
@@ -30,9 +30,9 @@ namespace gluten {
 class VeloxPlanConverter {
  public:
   explicit VeloxPlanConverter(
-      const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
       facebook::velox::memory::MemoryPool* veloxPool,
       const facebook::velox::config::ConfigBase* veloxCfg,
+      const std::vector<std::shared_ptr<ResultIterator>>& rowVectors,
       const std::optional<std::string> writeFilesTempPath = std::nullopt,
       const std::optional<std::string> writeFileName = std::nullopt,
       bool validationMode = false);
@@ -45,6 +45,12 @@ class VeloxPlanConverter {
     return substraitVeloxPlanConverter_.splitInfos();
   }
 
+  /// The input iterators not inlined to VeloxPlan. They should be then 
manually added to the Velox task
+  /// via WholeStageResultIterator#addIteratorSplits. Empty if no input 
iterators remaining.
+  const std::vector<std::shared_ptr<ResultIterator>>& 
remainingInputIterators() const {
+    return substraitVeloxPlanConverter_.remainingInputIterators();
+  }
+
  private:
   bool validationMode_;
 
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index af0fde37ce..69d21b4a57 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -17,6 +17,8 @@
 
 #include "VeloxRuntime.h"
 
+#include <operators/plannodes/RowVectorStream.h>
+
 #include <algorithm>
 #include <filesystem>
 
@@ -124,14 +126,19 @@ void VeloxRuntime::getInfoAndIds(
     std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
     std::vector<velox::core::PlanNodeId>& scanIds,
     std::vector<velox::core::PlanNodeId>& streamIds) {
+  int32_t streamIdx = 0;
   for (const auto& leafPlanNodeId : leafPlanNodeIds) {
     auto it = splitInfoMap.find(leafPlanNodeId);
     if (it == splitInfoMap.end()) {
       throw std::runtime_error("Could not find leafPlanNodeId.");
     }
     auto splitInfo = it->second;
+    // Based on the current code, indexing of streams and files follow 
different orders:
+    // 1. Streams follow "iterator:<idx>" in the substrait plan;
+    // 2. Files follow the traversal order in the plan node tree.
+    // FIXME: Why we didn't have a unified design?
     if (splitInfo->isStream) {
-      streamIds.emplace_back(leafPlanNodeId);
+      
streamIds.emplace_back(ValueStreamConnectorFactory::nodeIdOf(streamIdx++));
     } else {
       scanInfos.emplace_back(splitInfo);
       scanIds.emplace_back(leafPlanNodeId);
@@ -140,10 +147,8 @@ void VeloxRuntime::getInfoAndIds(
 }
 
 std::string VeloxRuntime::planString(bool details, const 
std::unordered_map<std::string, std::string>& sessionConf) {
-  std::vector<std::shared_ptr<ResultIterator>> inputs;
   auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
-  VeloxPlanConverter veloxPlanConverter(
-      inputs, veloxMemoryPool.get(), veloxCfg_.get(), std::nullopt, 
std::nullopt, true);
+  VeloxPlanConverter veloxPlanConverter(veloxMemoryPool.get(), 
veloxCfg_.get(), {}, std::nullopt, std::nullopt, true);
   auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_);
   return veloxPlan->toString(details, true);
 }
@@ -160,9 +165,9 @@ std::shared_ptr<ResultIterator> 
VeloxRuntime::createResultIterator(
   LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << 
printConfig(confMap_);
 
   VeloxPlanConverter veloxPlanConverter(
-      inputs,
       memoryManager()->getLeafMemoryPool().get(),
       veloxCfg_.get(),
+      inputs,
       *localWriteFilesTempPath(),
       *localWriteFileName());
   veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, 
std::move(localFiles_));
@@ -187,9 +192,24 @@ std::shared_ptr<ResultIterator> 
VeloxRuntime::createResultIterator(
       spillDir,
       veloxCfg_,
       taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{});
+
+  auto remainingInputIterators = veloxPlanConverter.remainingInputIterators();
+  if (!remainingInputIterators.empty()) {
+  // Converts remaining input iterators to splits and add them to the task.
+    wholeStageIter->addIteratorSplits(remainingInputIterators);
+  }
+
   return std::make_shared<ResultIterator>(std::move(wholeStageIter), this);
 }
 
+void VeloxRuntime::noMoreSplits(ResultIterator* iter){
+    auto* splitAwareIter = 
dynamic_cast<gluten::SplitAwareColumnarBatchIterator*>(iter->getInputIter());
+    if (splitAwareIter == nullptr) {
+      throw GlutenException("Iterator does not support split management");
+    }
+    splitAwareIter->noMoreSplits();
+}
+
 std::shared_ptr<ColumnarToRowConverter> 
VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) {
   auto veloxPool = memoryManager()->getLeafMemoryPool();
   return std::make_shared<VeloxColumnarToRowConverter>(veloxPool, 
column2RowMemThreshold);
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index b39733e839..a3c3da0c5a 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -59,6 +59,8 @@ class VeloxRuntime final : public Runtime {
       const std::string& spillDir,
       const std::vector<std::shared_ptr<ResultIterator>>& inputs = {}) 
override;
 
+  void noMoreSplits(ResultIterator* iter) override;
+
   std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t 
column2RowMemThreshold) override;
 
   std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) 
override;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index e91e2ad69d..ac5773e439 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -16,6 +16,7 @@
  */
 #include "WholeStageResultIterator.h"
 #include "VeloxBackend.h"
+#include "VeloxPlanConverter.h"
 #include "VeloxRuntime.h"
 #include "config/VeloxConfig.h"
 #include "utils/ConfigExtractor.h"
@@ -29,6 +30,8 @@
 #include "velox/experimental/cudf/exec/ToCudf.h"
 #include "cudf/GpuLock.h"
 #endif
+#include "operators/plannodes/RowVectorStream.h"
+
 
 using namespace facebook;
 
@@ -227,7 +230,6 @@ std::shared_ptr<velox::core::QueryCtx> 
WholeStageResultIterator::createNewVeloxQ
 }
 
 std::shared_ptr<ColumnarBatch> WholeStageResultIterator::next() {
-  tryAddSplitsToTask();
   if (task_->isFinished()) {
     return nullptr;
   }
@@ -355,17 +357,40 @@ void WholeStageResultIterator::constructPartitionColumns(
   }
 }
 
-void WholeStageResultIterator::tryAddSplitsToTask() {
-  if (noMoreSplits_) {
+void WholeStageResultIterator::addIteratorSplits(const 
std::vector<std::shared_ptr<ResultIterator>>& inputIterators) {
+  GLUTEN_CHECK(!allSplitsAdded, "Method addIteratorSplits should not be called 
since all splits has been added to the Velox task.");
+  // Create IteratorConnectorSplit for each iterator
+  for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) {
+    if (inputIterators[i] == nullptr) {
+      continue;
+    }
+    auto connectorSplit = std::make_shared<IteratorConnectorSplit>(
+        kIteratorConnectorId, inputIterators[i]);
+    exec::Split split(folly::copy(connectorSplit), -1);
+    task_->addSplit(streamIds_[i], std::move(split));
+  }
+}
+
+void WholeStageResultIterator::noMoreSplits() {
+  if (allSplitsAdded) {
     return;
   }
+  // Mark no more splits for all scan nodes
   for (int idx = 0; idx < scanNodeIds_.size(); idx++) {
     for (auto& split : splits_[idx]) {
       task_->addSplit(scanNodeIds_[idx], std::move(split));
     }
-    task_->noMoreSplits(scanNodeIds_[idx]);
   }
-  noMoreSplits_ = true;
+
+  for (const auto& scanNodeId : scanNodeIds_) {
+    task_->noMoreSplits(scanNodeId);
+  }
+  
+  // Mark no more splits for all stream nodes
+  for (const auto& streamId : streamIds_) {
+    task_->noMoreSplits(streamId);
+  }
+  allSplitsAdded = true;
 }
 
 void WholeStageResultIterator::collectMetrics() {
diff --git a/cpp/velox/compute/WholeStageResultIterator.h 
b/cpp/velox/compute/WholeStageResultIterator.h
index 358b153c7a..401cff06da 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -18,7 +18,7 @@
 
 #include "compute/Runtime.h"
 #include "iceberg/IcebergPlanConverter.h"
-#include "memory/ColumnarBatchIterator.h"
+#include "memory/SplitAwareColumnarBatchIterator.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "substrait/SubstraitToVeloxPlan.h"
 #include "substrait/plan.pb.h"
@@ -33,7 +33,7 @@
 
 namespace gluten {
 
-class WholeStageResultIterator : public ColumnarBatchIterator {
+class WholeStageResultIterator : public SplitAwareColumnarBatchIterator {
  public:
   WholeStageResultIterator(
       VeloxMemoryManager* memoryManager,
@@ -69,14 +69,22 @@ class WholeStageResultIterator : public 
ColumnarBatchIterator {
     return metrics_.get();
   }
 
-  const facebook::velox::exec::Task* task() const {
-    return task_.get();
-  }
-
   const facebook::velox::core::PlanNode* veloxPlan() const {
     return veloxPlan_.get();
   }
 
+  /// Get the underlying Velox task for direct manipulation
+  facebook::velox::exec::Task* task() {
+    return task_.get();
+  }
+
+  /// Add iterator-based splits from input iterators
+  void addIteratorSplits(const std::vector<std::shared_ptr<ResultIterator>>& 
inputIterators) override;
+
+  /// Signal that no more splits will be added.
+  /// This is required for proper task completion and enables future barrier 
support.
+  void noMoreSplits() override;
+
  private:
   /// Get the Spark confs to Velox query context.
   std::unordered_map<std::string, std::string> getQueryContextConf();
@@ -94,9 +102,6 @@ class WholeStageResultIterator : public 
ColumnarBatchIterator {
       std::unordered_map<std::string, std::optional<std::string>>&,
       const std::unordered_map<std::string, std::string>&);
 
-  /// Add splits to task. Skip if already added.
-  void tryAddSplitsToTask();
-
   /// Collect Velox metrics.
   void collectMetrics();
 
@@ -134,7 +139,7 @@ class WholeStageResultIterator : public 
ColumnarBatchIterator {
   std::vector<std::shared_ptr<SplitInfo>> scanInfos_;
   std::vector<facebook::velox::core::PlanNodeId> streamIds_;
   std::vector<std::vector<facebook::velox::exec::Split>> splits_;
-  bool noMoreSplits_ = false;
+  bool allSplitsAdded = false;
 
   int64_t loadLazyVectorTime_ = 0;
 };
diff --git a/cpp/velox/cudf/CudfPlanValidator.cc 
b/cpp/velox/cudf/CudfPlanValidator.cc
index b2d6e82c22..a5d350e579 100644
--- a/cpp/velox/cudf/CudfPlanValidator.cc
+++ b/cpp/velox/cudf/CudfPlanValidator.cc
@@ -46,8 +46,8 @@ bool CudfPlanValidator::validate(const ::substrait::Plan& 
substraitPlan) {
   std::vector<std::shared_ptr<ResultIterator>> inputs;
   std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg = 
std::make_shared<facebook::velox::config::ConfigBase>(
       std::unordered_map<std::string, std::string>{{kCudfEnabled, "true"}});
-  VeloxPlanConverter veloxPlanConverter(
-      inputs, veloxMemoryPool.get(), veloxCfg.get(), std::nullopt, 
std::nullopt, true);
+  VeloxPlanConverter veloxPlanConverter(veloxMemoryPool.get(), veloxCfg.get(),
+      inputs, std::nullopt, std::nullopt, true);
   auto planNode = veloxPlanConverter.toVeloxPlan(substraitPlan, localFiles);
   std::unordered_set<velox::core::PlanNodeId> emptySet;
   velox::core::PlanFragment planFragment{planNode, 
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
diff --git a/cpp/velox/jni/JniFileSystem.cc b/cpp/velox/jni/JniFileSystem.cc
index 51a7821f01..06d831a117 100644
--- a/cpp/velox/jni/JniFileSystem.cc
+++ b/cpp/velox/jni/JniFileSystem.cc
@@ -352,6 +352,7 @@ class JniFileSystem : public 
facebook::velox::filesystems::FileSystem {
       jstring element = 
static_cast<jstring>(env->GetObjectArrayElement(jarray, i));
       std::string cElement = jStringToCString(env, element);
       out.push_back(cElement);
+      env->DeleteLocalRef(element);
     }
     return out;
   }
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 74adb1dff5..ad6f8947eb 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -219,6 +219,7 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateExpressi
     auto id = sFmap.function_anchor();
     auto name = sFmap.name();
     functionMappings.emplace(id, name);
+    env->DeleteLocalRef(mapping);
   }
 
   auto pool = defaultLeafVeloxMemoryPool().get();
@@ -475,6 +476,7 @@ 
Java_org_apache_gluten_utils_VeloxFileSystemValidationJniWrapper_allSupportedByR
     if (!velox::filesystems::isPathSupportedByRegisteredFileSystems(path)) {
       return false;
     }
+    env->DeleteLocalRef(string);
   }
   return true;
   JNI_METHOD_END(false)
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc 
b/cpp/velox/operators/plannodes/CudfVectorStream.cc
similarity index 93%
copy from cpp/velox/operators/plannodes/RowVectorStream.cc
copy to cpp/velox/operators/plannodes/CudfVectorStream.cc
index 7121fe93ed..85cd2c021d 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.cc
+++ b/cpp/velox/operators/plannodes/CudfVectorStream.cc
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-#include "RowVectorStream.h"
+#include "CudfVectorStream.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "velox/exec/Driver.h"
 #include "velox/exec/Operator.h"
@@ -45,7 +45,7 @@ class SuspendedSection {
 } // namespace
 
 namespace gluten {
-bool RowVectorStream::hasNext() {
+bool CudfVectorStreamBase::hasNext() {
   if (finished_) {
     return false;
   }
@@ -70,7 +70,7 @@ bool RowVectorStream::hasNext() {
   return hasNext;
 }
 
-std::shared_ptr<ColumnarBatch> RowVectorStream::nextInternal() {
+std::shared_ptr<ColumnarBatch> CudfVectorStreamBase::nextInternal() {
   if (finished_) {
     return nullptr;
   }
@@ -84,7 +84,7 @@ std::shared_ptr<ColumnarBatch> 
RowVectorStream::nextInternal() {
   return cb;
 }
 
-facebook::velox::RowVectorPtr RowVectorStream::next() {
+facebook::velox::RowVectorPtr CudfVectorStreamBase::next() {
   auto cb = nextInternal();
   const std::shared_ptr<VeloxColumnarBatch>& vb = 
VeloxColumnarBatch::from(pool_, cb);
   auto vp = vb->getRowVector();
@@ -92,4 +92,4 @@ facebook::velox::RowVectorPtr RowVectorStream::next() {
   return std::make_shared<facebook::velox::RowVector>(
       vp->pool(), outputType_, facebook::velox::BufferPtr(0), vp->size(), 
vp->children());
 }
-} // namespace gluten
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/operators/plannodes/CudfVectorStream.h 
b/cpp/velox/operators/plannodes/CudfVectorStream.h
index 9758d1c35e..7a663d252e 100644
--- a/cpp/velox/operators/plannodes/CudfVectorStream.h
+++ b/cpp/velox/operators/plannodes/CudfVectorStream.h
@@ -18,6 +18,11 @@
 #pragma once
 
 #include "CudfVectorStream.h"
+#include "compute/ResultIterator.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "velox/exec/Driver.h"
+#include "velox/exec/Operator.h"
+#include "velox/exec/Task.h"
 #include "velox/experimental/cudf/exec/CudfOperator.h"
 #include "velox/experimental/cudf/exec/Utilities.h"
 #include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
@@ -25,14 +30,78 @@
 
 namespace gluten {
 
-class CudfVectorStream : public RowVectorStream {
+class CudfVectorStreamBase {
+ public:
+  virtual ~CudfVectorStreamBase() = default;
+
+  explicit CudfVectorStreamBase(
+      facebook::velox::exec::DriverCtx* driverCtx,
+      facebook::velox::memory::MemoryPool* pool,
+      ResultIterator* iterator,
+      const facebook::velox::RowTypePtr& outputType)
+      : driverCtx_(driverCtx), pool_(pool), outputType_(outputType), 
iterator_(iterator) {}
+
+  bool hasNext();
+
+  // Convert arrow batch to row vector, construct the new Rowvector with new 
outputType.
+  virtual facebook::velox::RowVectorPtr next();
+
+ protected:
+  // Get the next batch from iterator_.
+  std::shared_ptr<ColumnarBatch> nextInternal();
+
+  facebook::velox::exec::DriverCtx* driverCtx_;
+  facebook::velox::memory::MemoryPool* pool_;
+  const facebook::velox::RowTypePtr outputType_;
+  ResultIterator* iterator_;
+
+  bool finished_{false};
+};
+
+class ValueStreamNode final : public facebook::velox::core::PlanNode {
+ public:
+  ValueStreamNode(
+      const facebook::velox::core::PlanNodeId& id,
+      const facebook::velox::RowTypePtr& outputType,
+      std::shared_ptr<ResultIterator> iterator)
+      : facebook::velox::core::PlanNode(id), outputType_(outputType), 
iterator_(std::move(iterator)) {}
+
+  const facebook::velox::RowTypePtr& outputType() const override {
+    return outputType_;
+  }
+
+  const std::vector<facebook::velox::core::PlanNodePtr>& sources() const 
override {
+    return kEmptySources_;
+  };
+
+  ResultIterator* iterator() const {
+    return iterator_.get();
+  }
+
+  std::string_view name() const override {
+    return "ValueStream";
+  }
+
+  folly::dynamic serialize() const override {
+    VELOX_UNSUPPORTED("ValueStream plan node is not serializable");
+  }
+
+ private:
+  void addDetails(std::stringstream& stream) const override{};
+
+  const facebook::velox::RowTypePtr outputType_;
+  std::shared_ptr<ResultIterator> iterator_;
+  const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
+};
+
+class CudfVectorStream : public CudfVectorStreamBase {
  public:
   CudfVectorStream(
       facebook::velox::exec::DriverCtx* driverCtx,
       facebook::velox::memory::MemoryPool* pool,
       ResultIterator* iterator,
       const facebook::velox::RowTypePtr& outputType)
-      : RowVectorStream(driverCtx, pool, iterator, outputType) {}
+      : CudfVectorStreamBase(driverCtx, pool, iterator, outputType) {}
 
   // Convert arrow batch to row vector and use new output columns
   facebook::velox::RowVectorPtr next() override {
@@ -133,7 +202,7 @@ class CudfValueStream : public 
facebook::velox::exec::SourceOperator, public fac
 
  private:
   bool finished_ = false;
-  std::unique_ptr<RowVectorStream> rvStream_;
+  std::unique_ptr<CudfVectorStream> rvStream_;
 };
 
 class CudfVectorStreamOperatorTranslator : public 
facebook::velox::exec::Operator::PlanNodeTranslator {
diff --git a/cpp/velox/operators/plannodes/IteratorSplit.h 
b/cpp/velox/operators/plannodes/IteratorSplit.h
new file mode 100644
index 0000000000..6e42d10e6f
--- /dev/null
+++ b/cpp/velox/operators/plannodes/IteratorSplit.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "compute/ResultIterator.h"
+#include "velox/connectors/Connector.h"
+
+namespace gluten {
+
+/// Custom connector ID for iterator-based splits
+constexpr const char* kIteratorConnectorId = "value-stream";
+
+/// A custom split type that wraps a ResultIterator
+/// This allows iterators to be treated as splits and added dynamically to 
tasks
+class IteratorConnectorSplit : public 
facebook::velox::connector::ConnectorSplit {
+ public:
+  explicit IteratorConnectorSplit(const std::string& connectorId, 
std::shared_ptr<ResultIterator> iterator)
+      : ConnectorSplit(connectorId), iterator_(std::move(iterator)) {}
+
+  std::shared_ptr<ResultIterator> iterator() const {
+    return iterator_;
+  }
+
+  std::string toString() const override {
+    return fmt::format("IteratorSplit[{}]", connectorId);
+  }
+
+ private:
+  std::shared_ptr<ResultIterator> iterator_;
+};
+
+} // namespace gluten
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.cc 
b/cpp/velox/operators/plannodes/RowVectorStream.cc
index 7121fe93ed..7c0b00979a 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.cc
+++ b/cpp/velox/operators/plannodes/RowVectorStream.cc
@@ -20,6 +20,7 @@
 #include "velox/exec/Driver.h"
 #include "velox/exec/Operator.h"
 #include "velox/exec/Task.h"
+#include "velox/vector/arrow/Bridge.h"
 
 namespace {
 
@@ -45,6 +46,7 @@ class SuspendedSection {
 } // namespace
 
 namespace gluten {
+
 bool RowVectorStream::hasNext() {
   if (finished_) {
     return false;
@@ -61,7 +63,13 @@ bool RowVectorStream::hasNext() {
     // As of now, non-zero running threads usually happens when:
     // 1. Task A spills task B;
     // 2. Task A tries to grow buffers created by task B, during which spill 
is requested on task A again.
-    SuspendedSection ss(driverCtx_->driver);
+    const facebook::velox::exec::DriverThreadContext* driverThreadCtx =
+    facebook::velox::exec::driverThreadContext();
+    VELOX_CHECK_NOT_NULL(
+        driverThreadCtx,
+        "ExternalStreamDataSource::next() is not called "
+        "from a driver thread");
+    SuspendedSection ss(driverThreadCtx->driverCtx()->driver);
     hasNext = iterator_->hasNext();
   }
   if (!hasNext) {
@@ -78,7 +86,13 @@ std::shared_ptr<ColumnarBatch> 
RowVectorStream::nextInternal() {
   {
     // We are leaving Velox task execution and are probably entering Spark 
code through JNI. Suspend the current
     // driver to make the current task open to spilling.
-    SuspendedSection ss(driverCtx_->driver);
+    const facebook::velox::exec::DriverThreadContext* driverThreadCtx =
+    facebook::velox::exec::driverThreadContext();
+    VELOX_CHECK_NOT_NULL(
+        driverThreadCtx,
+        "ExternalStreamDataSource::next() is not called "
+        "from a driver thread");
+    SuspendedSection ss(driverThreadCtx->driverCtx()->driver);
     cb = iterator_->next();
   }
   return cb;
@@ -92,4 +106,67 @@ facebook::velox::RowVectorPtr RowVectorStream::next() {
   return std::make_shared<facebook::velox::RowVector>(
       vp->pool(), outputType_, facebook::velox::BufferPtr(0), vp->size(), 
vp->children());
 }
-} // namespace gluten
+
+ValueStreamDataSource::ValueStreamDataSource(
+    const facebook::velox::RowTypePtr& outputType,
+    const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle,
+    const facebook::velox::connector::ColumnHandleMap& columnHandles,
+    facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx)
+    : outputType_(outputType),
+      pool_(connectorQueryCtx->memoryPool()) {}
+
+void 
ValueStreamDataSource::addSplit(std::shared_ptr<facebook::velox::connector::ConnectorSplit>
 split) {
+  // Cast to IteratorConnectorSplit to extract the iterator
+  auto iteratorSplit = std::dynamic_pointer_cast<const 
IteratorConnectorSplit>(split);
+  if (!iteratorSplit) {
+    throw std::runtime_error("Split is not an IteratorConnectorSplit");
+  }
+  
+  auto iterator = iteratorSplit->iterator();
+  if (!iterator) {
+    throw std::runtime_error("IteratorConnectorSplit contains null iterator");
+  }
+  
+  // Create RowVectorStream wrapper and add to pending queue
+  auto rowVectorStream = std::make_shared<RowVectorStream>(pool_, iterator, 
outputType_);
+  pendingIterators_.push_back(rowVectorStream);
+}
+
+std::optional<facebook::velox::RowVectorPtr> ValueStreamDataSource::next(
+    uint64_t size,
+    facebook::velox::ContinueFuture& future) {
+  // Try to get current iterator if we don't have one
+  while (!currentIterator_) {
+    if (pendingIterators_.empty()) {
+      // No more iterators to process
+      return nullptr;
+    }
+    
+    // Get next RowVectorStream from queue
+    currentIterator_ = pendingIterators_.front();
+    pendingIterators_.erase(pendingIterators_.begin());
+  }
+
+  // Check if current stream has more data
+  if (!currentIterator_->hasNext()) {
+    // Current stream exhausted, try next one
+    currentIterator_ = nullptr;
+    return next(size, future);  // Recursively try next stream
+  }
+
+  // Get next batch from current stream (RowVectorStream handles conversion)
+  auto rowVector = currentIterator_->next();
+  
+  if (!rowVector) {
+    currentIterator_ = nullptr;
+    return next(size, future);  // Recursively try next stream
+  }
+
+  // Update metrics
+  completedRows_ += rowVector->size();
+  completedBytes_ += rowVector->estimateFlatSize();
+
+  return rowVector;
+}
+
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h 
b/cpp/velox/operators/plannodes/RowVectorStream.h
index d6b8d37bcd..6e6ccd1527 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -19,6 +19,8 @@
 
 #include "compute/ResultIterator.h"
 #include "memory/VeloxColumnarBatch.h"
+#include "operators/plannodes/IteratorSplit.h"
+#include "velox/connectors/Connector.h"
 #include "velox/exec/Driver.h"
 #include "velox/exec/Operator.h"
 #include "velox/exec/Task.h"
@@ -30,11 +32,10 @@ class RowVectorStream {
   virtual ~RowVectorStream() = default;
 
   explicit RowVectorStream(
-      facebook::velox::exec::DriverCtx* driverCtx,
       facebook::velox::memory::MemoryPool* pool,
-      ResultIterator* iterator,
+      std::shared_ptr<ResultIterator> iterator,
       const facebook::velox::RowTypePtr& outputType)
-      : driverCtx_(driverCtx), pool_(pool), outputType_(outputType), 
iterator_(iterator) {}
+      : pool_(pool), outputType_(outputType), iterator_(iterator) {}
 
   bool hasNext();
 
@@ -45,100 +46,133 @@ class RowVectorStream {
   // Get the next batch from iterator_.
   std::shared_ptr<ColumnarBatch> nextInternal();
 
-  facebook::velox::exec::DriverCtx* driverCtx_;
   facebook::velox::memory::MemoryPool* pool_;
   const facebook::velox::RowTypePtr outputType_;
-  ResultIterator* iterator_;
+  std::shared_ptr<ResultIterator> iterator_;
 
   bool finished_{false};
 };
 
-class ValueStreamNode final : public facebook::velox::core::PlanNode {
+/// DataSource implementation that reads from ResultIterator instances.
+/// This allows iterator-based data to be consumed via Velox's standard
+/// connector/split mechanism, enabling proper integration with 
Task::addSplit().
+class ValueStreamDataSource : public facebook::velox::connector::DataSource {
  public:
-  ValueStreamNode(
-      const facebook::velox::core::PlanNodeId& id,
+  ValueStreamDataSource(
       const facebook::velox::RowTypePtr& outputType,
-      std::shared_ptr<ResultIterator> iterator)
-      : facebook::velox::core::PlanNode(id), outputType_(outputType), 
iterator_(std::move(iterator)) {}
+      const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle,
+      const facebook::velox::connector::ColumnHandleMap& columnHandles,
+      facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx);
 
-  const facebook::velox::RowTypePtr& outputType() const override {
-    return outputType_;
-  }
+  void addSplit(std::shared_ptr<facebook::velox::connector::ConnectorSplit> 
split) override;
 
-  const std::vector<facebook::velox::core::PlanNodePtr>& sources() const 
override {
-    return kEmptySources_;
-  };
+  std::optional<facebook::velox::RowVectorPtr> next(uint64_t size, 
facebook::velox::ContinueFuture& future) override;
 
-  ResultIterator* iterator() const {
-    return iterator_.get();
+  void addDynamicFilter(
+      facebook::velox::column_index_t outputChannel,
+      const std::shared_ptr<facebook::velox::common::Filter>& filter) override 
{
+    // Iterator-based sources don't support dynamic filtering
   }
 
-  std::string_view name() const override {
-    return "ValueStream";
+  uint64_t getCompletedBytes() override {
+    return completedBytes_;
   }
 
-  folly::dynamic serialize() const override {
-    VELOX_UNSUPPORTED("ValueStream plan node is not serializable");
+  uint64_t getCompletedRows() override {
+    return completedRows_;
   }
 
- private:
-  void addDetails(std::stringstream& stream) const override{};
+  std::unordered_map<std::string, facebook::velox::RuntimeMetric> 
getRuntimeStats() override {
+    return {};
+  }
 
+ private:
   const facebook::velox::RowTypePtr outputType_;
-  std::shared_ptr<ResultIterator> iterator_;
-  const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
+  facebook::velox::memory::MemoryPool* pool_;
+
+  std::vector<std::shared_ptr<RowVectorStream>> pendingIterators_;
+  std::shared_ptr<RowVectorStream> currentIterator_{nullptr};
+  uint64_t completedBytes_{0};
+  uint64_t completedRows_{0};
 };
 
-class ValueStream : public facebook::velox::exec::SourceOperator {
+/// Table handle for iterator-based scans
+class ValueStreamTableHandle : public 
facebook::velox::connector::ConnectorTableHandle {
  public:
-  ValueStream(
-      int32_t operatorId,
-      facebook::velox::exec::DriverCtx* driverCtx,
-      std::shared_ptr<const ValueStreamNode> valueStreamNode)
-      : facebook::velox::exec::SourceOperator(
-            driverCtx,
-            valueStreamNode->outputType(),
-            operatorId,
-            valueStreamNode->id(),
-            valueStreamNode->name().data()) {
-    ResultIterator* itr = valueStreamNode->iterator();
-    rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr, 
outputType_);
+  explicit ValueStreamTableHandle(std::string connectorId) : 
ConnectorTableHandle(connectorId) {}
+
+  const std::string& name() const override {
+    static const std::string kName = "ValueStreamTableHandle";
+    return kName;
   }
 
-  facebook::velox::RowVectorPtr getOutput() override {
-    if (finished_) {
-      return nullptr;
-    }
-    if (rvStream_->hasNext()) {
-      return rvStream_->next();
-    } else {
-      finished_ = true;
-      return nullptr;
-    }
+  folly::dynamic serialize() const override {
+    VELOX_NYI();
   }
+};
+
+/// Column handle for iterator-based scans
+class ValueStreamColumnHandle : public 
facebook::velox::connector::ColumnHandle {
+ public:
+  ValueStreamColumnHandle(std::string name, facebook::velox::TypePtr type)
+      : name_(std::move(name)), type_(std::move(type)) {}
 
-  facebook::velox::exec::BlockingReason 
isBlocked(facebook::velox::ContinueFuture* /* unused */) override {
-    return facebook::velox::exec::BlockingReason::kNotBlocked;
+  const std::string& name() const {
+    return name_;
   }
 
-  bool isFinished() override {
-    return finished_;
+  const facebook::velox::TypePtr& type() const {
+    return type_;
   }
 
  private:
-  bool finished_ = false;
-  std::unique_ptr<RowVectorStream> rvStream_;
+  std::string name_;
+  facebook::velox::TypePtr type_;
 };
 
-class RowVectorStreamOperatorTranslator : public 
facebook::velox::exec::Operator::PlanNodeTranslator {
-  std::unique_ptr<facebook::velox::exec::Operator> toOperator(
-      facebook::velox::exec::DriverCtx* ctx,
-      int32_t id,
-      const facebook::velox::core::PlanNodePtr& node) override {
-    if (auto valueStreamNode = std::dynamic_pointer_cast<const 
ValueStreamNode>(node)) {
-      return std::make_unique<ValueStream>(id, ctx, valueStreamNode);
-    }
-    return nullptr;
+/// Connector implementation for iterator-based data sources
+class ValueStreamConnector : public facebook::velox::connector::Connector {
+ public:
+  explicit ValueStreamConnector(
+      const std::string& id,
+      std::shared_ptr<const facebook::velox::config::ConfigBase> config)
+      : Connector(id, config) {}
+
+  std::unique_ptr<facebook::velox::connector::DataSource> createDataSource(
+      const facebook::velox::RowTypePtr& outputType,
+      const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle,
+      const facebook::velox::connector::ColumnHandleMap& columnHandles,
+      facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx) 
override {
+    return std::make_unique<ValueStreamDataSource>(outputType, tableHandle, 
columnHandles, connectorQueryCtx);
+  }
+
+  std::unique_ptr<facebook::velox::connector::DataSink> createDataSink(
+      facebook::velox::RowTypePtr inputType,
+      facebook::velox::connector::ConnectorInsertTableHandlePtr 
connectorInsertTableHandle,
+      facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx,
+      facebook::velox::connector::CommitStrategy commitStrategy) override {
+    VELOX_UNSUPPORTED("ValueStreamConnector does not support data sinks");
+  }
+};
+
+/// Factory for creating ValueStreamConnector instances
+class ValueStreamConnectorFactory : public 
facebook::velox::connector::ConnectorFactory {
+ public:
+  static constexpr const char* kValueStreamConnectorName = "value-stream";
+
+  static std::string nodeIdOf(int32_t streamIdx) {
+    return fmt::format("{}:{}", kValueStreamConnectorName, streamIdx);
+  }
+
+  ValueStreamConnectorFactory() : ConnectorFactory(kValueStreamConnectorName) 
{}
+
+  std::shared_ptr<facebook::velox::connector::Connector> newConnector(
+      const std::string& id,
+      std::shared_ptr<const facebook::velox::config::ConfigBase> config,
+      folly::Executor* ioExecutor = nullptr,
+      folly::Executor* cpuExecutor = nullptr) override {
+    return std::make_shared<ValueStreamConnector>(id, config);
   }
 };
+
 } // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index b20b4a3d09..0085c553ba 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -899,15 +899,15 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
       
SubstraitParser::configSetInOptimization(generateRel.advanced_extension(), 
"injectedProject=");
 
   if (injectedProject) {
-    // Child should be either ProjectNode or ValueStreamNode in case of 
project fallback.
+    // Child should be either ProjectNode or CudfValueStreamNode (GPU) in case 
of project fallback.
     VELOX_CHECK(
         (std::dynamic_pointer_cast<const core::ProjectNode>(childNode) != 
nullptr ||
-         std::dynamic_pointer_cast<const ValueStreamNode>(childNode) != 
nullptr)
+        std::dynamic_pointer_cast<const core::TableScanNode>(childNode) != 
nullptr
 #ifdef GLUTEN_ENABLE_GPU
             || std::dynamic_pointer_cast<const CudfValueStreamNode>(childNode) 
!= nullptr
 #endif
-                && childNode->outputType()->size() > 
requiredChildOutput.size(),
-        "injectedProject is true, but the ProjectNode or ValueStreamNode (in 
case of projection fallback)"
+        ) && childNode->outputType()->size() > requiredChildOutput.size(),
+        "injectedProject is true, but the ProjectNode or TableScanNode or 
CudfValueStreamNode (in case of projection fallback)"
         " is missing or does not have the corresponding projection field");
 
     bool isStack = generateRel.has_advanced_extension() &&
@@ -1281,8 +1281,56 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
       nextPlanNodeId(), sortingKeys, sortingOrders, 
static_cast<int32_t>(topNRel.n()), false /*isPartial*/, childNode);
 }
 
-template <typename T>
 core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(
+    const ::substrait::ReadRel& readRel, int32_t streamIdx) {
+  // Use TableScanNode with iterator connector for runtime iterator inputs
+  // Get output schema from ReadRel
+  uint64_t colNum = 0;
+  std::vector<TypePtr> veloxTypeList;
+  if (readRel.has_base_schema()) {
+    const auto& baseSchema = readRel.base_schema();
+    colNum = baseSchema.names().size();
+    veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema);
+  }
+
+  auto nodeId = ValueStreamConnectorFactory::nodeIdOf(streamIdx);
+  std::vector<std::string> outNames;
+  outNames.reserve(colNum);
+  for (int idx = 0; idx < colNum; idx++) {
+    // TODO: We'd use the designated names in readRel rather than assigning 
new names.
+    auto colName = fmt::format("node_{}_{}", nodeId, idx);
+    outNames.emplace_back(colName);
+  }
+  auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
+
+  // Create TableHandle
+  auto tableHandle = 
std::make_shared<ValueStreamTableHandle>(kIteratorConnectorId);
+
+  // Create column assignments
+  connector::ColumnHandleMap assignments;
+  for (int idx = 0; idx < outputType->size(); idx++) {
+    auto name = outputType->nameOf(idx);
+    auto type = outputType->childAt(idx);
+    assignments[name] = std::make_shared<ValueStreamColumnHandle>(name, type);
+  }
+
+  // Create TableScanNode
+  auto tableScanNode = std::make_shared<core::TableScanNode>(
+      nodeId,
+      outputType,
+      tableHandle,
+      assignments);
+
+  // Mark this as a stream-based split
+  auto splitInfo = std::make_shared<SplitInfo>();
+  splitInfo->isStream = true;
+  splitInfoMap_[tableScanNode->id()] = splitInfo;
+
+  return tableScanNode;
+}
+
+#ifdef GLUTEN_ENABLE_GPU
+core::PlanNodePtr SubstraitToVeloxPlanConverter::constructCudfValueStreamNode(
     const ::substrait::ReadRel& readRel,
     int32_t streamIdx) {
   // Get the input schema of this iterator.
@@ -1307,28 +1355,31 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructValueStreamNode(
   std::shared_ptr<ResultIterator> iterator;
   if (!validationMode_) {
     VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index 
{} in input iterator list.", streamIdx);
-    iterator = inputIters_[streamIdx];
+    iterator = std::move(inputIters_[streamIdx]);
   }
-  auto node = std::make_shared<T>(nextPlanNodeId(), outputType, 
std::move(iterator));
+  auto node = std::make_shared<CudfValueStreamNode>(nextPlanNodeId(), 
outputType, std::move(iterator));
 
   auto splitInfo = std::make_shared<SplitInfo>();
   splitInfo->isStream = true;
   splitInfoMap_[node->id()] = splitInfo;
   return node;
 }
+#endif
 
 core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode(
     const ::substrait::ReadRel& readRel,
     int32_t streamIdx) {
-  std::vector<RowVectorPtr> values;
+  // ValuesNode is only used for validation/benchmarking with query trace
+  // It loads all data from the iterator at plan construction time
   VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index 
{} in input iterator list.", streamIdx);
-  const auto iterator = inputIters_[streamIdx];
-  while (iterator->hasNext()) {
-    auto cb = VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(), 
iterator->next());
-    values.emplace_back(cb->getRowVector());
-  }
-  auto node = 
std::make_shared<facebook::velox::core::ValuesNode>(nextPlanNodeId(), 
std::move(values));
-
+  const auto iter = std::move(inputIters_[streamIdx]);
+  std::vector<RowVectorPtr> rowVectors;
+  while (iter->hasNext()) {
+    auto batch = iter->next();
+    auto veloxBatch = 
VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(), batch);
+    rowVectors.emplace_back(veloxBatch->getRowVector());
+  }
+  auto node = 
std::make_shared<facebook::velox::core::ValuesNode>(nextPlanNodeId(), 
std::move(rowVectors));
   auto splitInfo = std::make_shared<SplitInfo>();
   splitInfo->isStream = true;
   splitInfoMap_[node->id()] = splitInfo;
@@ -1343,19 +1394,20 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
         !readRel.common().has_emit(), "Emit not supported for ValuesNode and 
TableScanNode related Substrait plans.");
   }
 
-  // Check if the ReadRel specifies an input of stream. If yes, build 
ValueStreamNode as the data source.
   auto streamIdx = getStreamIndex(readRel);
   if (streamIdx >= 0) {
+    // Check if the ReadRel specifies an input of stream. If yes, build 
TableScanNode with iterator connector.
+    const bool isQueryTraceEnabled = veloxCfg_->get<bool>(kQueryTraceEnabled, 
false);
+    if (isQueryTraceEnabled) {
+      // Only used in benchmark enable query trace, replace ValueStreamNode to 
ValuesNode to support serialization.
+      return constructValuesNode(readRel, streamIdx);
+    }
 #ifdef GLUTEN_ENABLE_GPU
     if (veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
-      return constructValueStreamNode<CudfValueStreamNode>(readRel, streamIdx);
+      return constructCudfValueStreamNode(readRel, streamIdx);
     }
 #endif
-    if (!veloxCfg_->get<bool>(kQueryTraceEnabled, false)) {
-      return constructValueStreamNode<ValueStreamNode>(readRel, streamIdx);
-    }
-    // Only used in benchmark enable query trace, replace ValueStreamNode to 
ValuesNode to support serialization.
-    return constructValuesNode(readRel, streamIdx);
+    return constructValueStreamNode(readRel, streamIdx);
   }
 
   // Otherwise, will create TableScan node for ReadRel.
@@ -1651,14 +1703,4 @@ bool 
SubstraitToVeloxPlanConverter::checkTypeExtension(const ::substrait::Plan&
   return true;
 }
 
-#ifdef GLUTEN_ENABLE_GPU
-template core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructValueStreamNode<CudfValueStreamNode>(
-    const ::substrait::ReadRel& sRead,
-    int32_t streamIdx);
-#endif
-
-template core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructValueStreamNode<ValueStreamNode>(
-    const ::substrait::ReadRel& sRead,
-    int32_t streamIdx);
-
 } // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 48e5709ea8..0e00764a66 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -70,11 +70,13 @@ class SubstraitToVeloxPlanConverter {
   explicit SubstraitToVeloxPlanConverter(
       memory::MemoryPool* pool,
       const facebook::velox::config::ConfigBase* veloxCfg,
+      const std::vector<std::shared_ptr<ResultIterator>>& inputIters,
       const std::optional<std::string> writeFilesTempPath = std::nullopt,
       const std::optional<std::string> writeFileName = std::nullopt,
       bool validationMode = false)
       : pool_(pool),
         veloxCfg_(veloxCfg),
+        inputIters_(inputIters),
         writeFilesTempPath_(writeFilesTempPath),
         writeFileName_(writeFileName),
         validationMode_(validationMode) {
@@ -133,9 +135,12 @@ class SubstraitToVeloxPlanConverter {
   /// FileProperties: the file sizes and modification times of the files to be 
scanned.
   core::PlanNodePtr toVeloxPlan(const ::substrait::ReadRel& sRead);
 
-  template <typename T>
+  // Construct a table scan node accepting value streams as input.
   core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& 
sRead, int32_t streamIdx);
 
+  // Construct a cuDF value stream node.
+  core::PlanNodePtr constructCudfValueStreamNode(const ::substrait::ReadRel& 
sRead, int32_t streamIdx);
+
   // This is only used in benchmark and enable query trace, which will load 
all the data to ValuesNode.
   core::PlanNodePtr constructValuesNode(const ::substrait::ReadRel& sRead, 
int32_t streamIdx);
 
@@ -181,8 +186,10 @@ class SubstraitToVeloxPlanConverter {
     splitInfos_ = splitInfos;
   }
 
-  void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
-    inputIters_ = std::move(inputIters);
+  /// The input iterators not inlined to VeloxPlan. They should be then 
manually added to the Velox task
+  /// via WholeStageResultIterator#addIteratorSplits. Empty if no input 
iterators remaining.
+  const std::vector<std::shared_ptr<ResultIterator>>& 
remainingInputIterators() const {
+    return inputIters_;
   }
 
   /// Used to check if ReadRel specifies an input of stream.
@@ -271,8 +278,6 @@ class SubstraitToVeloxPlanConverter {
   /// The map storing the split stats for each PlanNode.
   std::unordered_map<core::PlanNodeId, std::shared_ptr<SplitInfo>> 
splitInfoMap_;
 
-  std::vector<std::shared_ptr<ResultIterator>> inputIters_;
-
   /// The map storing the pre-built plan nodes which can be accessed through
   /// index. This map is only used when the computation of a Substrait plan
   /// depends on other input nodes.
@@ -291,6 +296,9 @@ class SubstraitToVeloxPlanConverter {
   /// A map of custom configs.
   const facebook::velox::config::ConfigBase* veloxCfg_;
 
+  /// Input row-vectors for query trace mode (ValuesNode / cuDF ValueStream 
support)
+  std::vector<std::shared_ptr<ResultIterator>> inputIters_;
+
   /// The temporary path used to write files.
   std::optional<std::string> writeFilesTempPath_;
   std::optional<std::string> writeFileName_;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h 
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
index 9005604f81..8afc7c5bf8 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
@@ -33,8 +33,8 @@ class SubstraitToVeloxPlanValidator {
     std::unordered_map<std::string, std::string> configs{
         {velox::core::QueryConfig::kSparkPartitionId, "0"}, 
{velox::core::QueryConfig::kSessionTimezone, "GMT"}};
     veloxCfg_ = 
std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
-    planConverter_ =
-        std::make_unique<SubstraitToVeloxPlanConverter>(pool, veloxCfg_.get(), 
std::nullopt, std::nullopt, true);
+    planConverter_ = std::make_unique<SubstraitToVeloxPlanConverter>(
+        pool, veloxCfg_.get(), std::vector<std::shared_ptr<ResultIterator>>{}, 
std::nullopt, std::nullopt, true);
     queryCtx_ = velox::core::QueryCtx::create(nullptr, 
velox::core::QueryConfig(veloxCfg_->rawConfigs()));
     // An execution context used for function validation.
     execCtx_ = std::make_unique<velox::core::ExecCtx>(pool, queryCtx_.get());
diff --git a/cpp/velox/tests/FunctionTest.cc b/cpp/velox/tests/FunctionTest.cc
index fc7892003f..17645aa1a9 100644
--- a/cpp/velox/tests/FunctionTest.cc
+++ b/cpp/velox/tests/FunctionTest.cc
@@ -44,7 +44,7 @@ class FunctionTest : public ::testing::Test, public 
test::VectorTestBase {
   std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg_ =
       
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
   std::shared_ptr<gluten::SubstraitToVeloxPlanConverter> planConverter_ =
-      std::make_shared<gluten::SubstraitToVeloxPlanConverter>(pool(), 
veloxCfg_.get());
+      std::make_shared<gluten::SubstraitToVeloxPlanConverter>(pool(), 
veloxCfg_.get(), std::vector<std::shared_ptr<ResultIterator>>());
 };
 
 TEST_F(FunctionTest, makeNames) {
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 8847fed685..5487e7fd2e 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -65,6 +65,11 @@ class DummyRuntime final : public Runtime {
     auto iter = std::make_shared<ResultIterator>(std::move(resIter));
     return iter;
   }
+
+  void noMoreSplits(ResultIterator* iter) override {
+    // Do nothing.
+  }
+
   MemoryManager* memoryManager() override {
     throw GlutenException("Not yet implemented");
   }
@@ -150,6 +155,7 @@ TEST(TestRuntime, GetResultIterator) {
   DummyMemoryManager mm(kDummyBackendKind);
   auto runtime = std::make_shared<DummyRuntime>(kDummyBackendKind, &mm, 
std::unordered_map<std::string, std::string>());
   auto iter = runtime->createResultIterator("/tmp/test-spill", {});
+  runtime->noMoreSplits(iter.get());
   ASSERT_TRUE(iter->hasNext());
   auto next = iter->next();
   ASSERT_NE(next, nullptr);
diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc 
b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
index f1f6ef3865..8222f74caa 100644
--- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
@@ -72,7 +72,7 @@ class Substrait2VeloxPlanConversionTest : public 
exec::test::HiveConnectorTestBa
   std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg_ =
       
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
   std::shared_ptr<VeloxPlanConverter> planConverter_ =
-      
std::make_shared<VeloxPlanConverter>(std::vector<std::shared_ptr<ResultIterator>>(),
 pool(), veloxCfg_.get());
+      std::make_shared<VeloxPlanConverter>(pool(), veloxCfg_.get(), 
std::vector<std::shared_ptr<ResultIterator>>());
 };
 
 // This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's
diff --git a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc 
b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
index 0a2b409526..d041bc359a 100644
--- a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
@@ -43,7 +43,7 @@ TEST_F(Substrait2VeloxValuesNodeConversionTest, valuesNode) {
   JsonToProtoConverter::readFromFile(planPath, substraitPlan);
   auto veloxCfg = 
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
   std::shared_ptr<SubstraitToVeloxPlanConverter> planConverter_ =
-      std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(), 
veloxCfg.get(), std::nullopt, std::nullopt, true);
+      std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(), 
veloxCfg.get(), std::vector<std::shared_ptr<ResultIterator>>(), std::nullopt, 
std::nullopt, true);
   auto veloxPlan = planConverter_->toVeloxPlan(substraitPlan);
 
   RowVectorPtr expectedData = makeRowVector(
diff --git a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc 
b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
index 7b8e8336ea..ae150bb98a 100644
--- a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
+++ b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
@@ -72,7 +72,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase {
     auto veloxCfg =
         
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
     std::shared_ptr<SubstraitToVeloxPlanConverter> substraitConverter_ =
-        std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(), 
veloxCfg.get(), std::nullopt, std::nullopt, true);
+        std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(), 
veloxCfg.get(), std::vector<std::shared_ptr<ResultIterator>>(), std::nullopt, 
std::nullopt, true);
 
     // Convert Substrait Plan to the same Velox Plan.
     auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan);
@@ -94,7 +94,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase {
           
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>());
       std::shared_ptr<SubstraitToVeloxPlanConverter> substraitConverter_ =
           std::make_shared<SubstraitToVeloxPlanConverter>(
-              pool_.get(), veloxCfg.get(), std::nullopt, std::nullopt, true);
+              pool_.get(), veloxCfg.get(), 
std::vector<std::shared_ptr<ResultIterator>>(), std::nullopt, std::nullopt, 
true);
       // Convert Substrait Plan to the same Velox Plan.
       auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan);
 
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
index f4d2c8e7d1..ad200dd46c 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
@@ -53,6 +53,11 @@ public class ColumnarBatchOutIterator extends 
ClosableIterator<ColumnarBatch>
 
   private native void nativeClose(long iterHandle);
 
+  private native boolean nativeAddIteratorSplits(
+      long iterHandle, ColumnarBatchInIterator[] batchItr);
+
+  private native void nativeNoMoreSplits(long iterHandle);
+
   @Override
   public boolean hasNext0() throws IOException {
     return nativeHasNext(iterHandle);
@@ -75,6 +80,34 @@ public class ColumnarBatchOutIterator extends 
ClosableIterator<ColumnarBatch>
     }
   }
 
+  /**
+   * Add new iterator splits to the iterator as new inputs for processing. 
Note: File-based splits
+   * are not supported.
+   *
+   * @param batchItr Array of iterators to add as splits
+   * @return true if splits were added successfully, false otherwise
+   * @throws IllegalStateException if the iterator is closed
+   */
+  public boolean addIteratorSplits(ColumnarBatchInIterator[] batchItr) {
+    if (closed.get()) {
+      throw new IllegalStateException("Cannot add splits to a closed 
iterator");
+    }
+    return nativeAddIteratorSplits(iterHandle, batchItr);
+  }
+
+  /**
+   * Signal that no more splits will be added to the iterator. This is 
required for proper task
+   * completion and is a prerequisite for barrier support.
+   *
+   * @throws IllegalStateException if the iterator is closed
+   */
+  public void noMoreSplits() {
+    if (closed.get()) {
+      throw new IllegalStateException("Cannot call noMoreSplits on a closed 
iterator");
+    }
+    nativeNoMoreSplits(iterHandle);
+  }
+
   @Override
   public void close0() {
     // To make sure the outputted batches are still accessible after the 
iterator is closed.
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index 813578566b..6d2c90896b 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -71,10 +70,11 @@ public class NativePlanEvaluator {
 
   // Used by WholeStageTransform to create the native computing pipeline and
   // return a columnar result iterator.
+  // Supports both creation-time splits (splitInfo, iterList) and runtime 
splits (via addSplits()).
   public ColumnarBatchOutIterator createKernelWithBatchIterator(
       byte[] wsPlan,
       byte[][] splitInfo,
-      List<ColumnarBatchInIterator> iterList,
+      ColumnarBatchInIterator[] iterList,
       int partitionIndex,
       String spillDirPath)
       throws RuntimeException {
@@ -82,7 +82,7 @@ public class NativePlanEvaluator {
         jniWrapper.nativeCreateKernelWithIterator(
             wsPlan,
             splitInfo,
-            iterList.toArray(new ColumnarBatchInIterator[0]),
+            iterList,
             TaskContext.get().stageId(),
             partitionIndex, // TaskContext.getPartitionId(),
             TaskContext.get().taskAttemptId(),
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index b8de4d63b5..a808290679 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -62,8 +62,11 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware 
{
   public native String nativePlanString(byte[] substraitPlan, Boolean details);
 
   /**
-   * Create a native compute kernel and return a columnar result iterator.
+   * Create a native compute kernel and return a columnar result iterator. 
Supports both
+   * creation-time splits (splitInfo, batchItr) and runtime splits (via 
addSplits).
    *
+   * @param splitInfo optional file-based splits to add at creation time (can 
be null)
+   * @param batchItr optional iterator-based splits to add at creation time 
(can be null)
    * @return iterator instance id
    */
   public native long nativeCreateKernelWithIterator(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to