This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7467fe427 [VL] Minor refactor for ValueStream node construction and 
usage (#6382)
7467fe427 is described below

commit 7467fe427159bd757d2eb911b0b6ea1da4928c19
Author: Yang Zhang <[email protected]>
AuthorDate: Wed Jul 10 15:32:37 2024 +0800

    [VL] Minor refactor for ValueStream node construction and usage (#6382)
---
 cpp/velox/compute/VeloxPlanConverter.cc         | 13 +---------
 cpp/velox/operators/plannodes/RowVectorStream.h | 32 +++++++++++++++++--------
 cpp/velox/substrait/SubstraitToVeloxPlan.cc     |  9 ++++++-
 cpp/velox/substrait/SubstraitToVeloxPlan.h      |  6 +++++
 4 files changed, 37 insertions(+), 23 deletions(-)

diff --git a/cpp/velox/compute/VeloxPlanConverter.cc 
b/cpp/velox/compute/VeloxPlanConverter.cc
index 315ff2da6..ed2545c78 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -36,18 +36,7 @@ VeloxPlanConverter::VeloxPlanConverter(
     bool validationMode)
     : validationMode_(validationMode),
       substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, 
validationMode) {
-  // avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause 
redefinition of array abi.h.
-  auto factory = [inputIters = std::move(inputIters), validationMode = 
validationMode](
-                     std::string nodeId, memory::MemoryPool* pool, int32_t 
streamIdx, RowTypePtr outputType) {
-    std::shared_ptr<ResultIterator> iterator;
-    if (!validationMode) {
-      VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream 
index {} in input iterator list.", streamIdx);
-      iterator = inputIters[streamIdx];
-    }
-    auto valueStream = std::make_shared<RowVectorStream>(pool, iterator, 
outputType);
-    return std::make_shared<ValueStreamNode>(nodeId, outputType, 
std::move(valueStream));
-  };
-  substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory));
+  substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
 }
 
 namespace {
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h 
b/cpp/velox/operators/plannodes/RowVectorStream.h
index e02b288c4..c72e9137f 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -32,11 +32,17 @@ class RowVectorStream {
       : iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}
 
   bool hasNext() {
-    return iterator_->hasNext();
+    if (!finished_) {
+      finished_ = !iterator_->hasNext();
+    }
+    return !finished_;
   }
 
   // Convert arrow batch to rowvector and use new output columns
   facebook::velox::RowVectorPtr next() {
+    if (finished_) {
+      return nullptr;
+    }
     const std::shared_ptr<VeloxColumnarBatch>& vb = 
VeloxColumnarBatch::from(pool_, iterator_->next());
     auto vp = vb->getRowVector();
     VELOX_DCHECK(vp != nullptr);
@@ -45,17 +51,18 @@ class RowVectorStream {
   }
 
  private:
+  bool finished_{false};
   std::shared_ptr<ResultIterator> iterator_;
   const facebook::velox::RowTypePtr outputType_;
   facebook::velox::memory::MemoryPool* pool_;
 };
 
-class ValueStreamNode : public facebook::velox::core::PlanNode {
+class ValueStreamNode final : public facebook::velox::core::PlanNode {
  public:
   ValueStreamNode(
       const facebook::velox::core::PlanNodeId& id,
       const facebook::velox::RowTypePtr& outputType,
-      std::shared_ptr<RowVectorStream> valueStream)
+      std::unique_ptr<RowVectorStream> valueStream)
       : facebook::velox::core::PlanNode(id), outputType_(outputType), 
valueStream_(std::move(valueStream)) {
     VELOX_CHECK_NOT_NULL(valueStream_);
   }
@@ -68,8 +75,8 @@ class ValueStreamNode : public 
facebook::velox::core::PlanNode {
     return kEmptySources;
   };
 
-  const std::shared_ptr<RowVectorStream>& rowVectorStream() const {
-    return valueStream_;
+  RowVectorStream* rowVectorStream() const {
+    return valueStream_.get();
   }
 
   std::string_view name() const override {
@@ -84,7 +91,7 @@ class ValueStreamNode : public 
facebook::velox::core::PlanNode {
   void addDetails(std::stringstream& stream) const override{};
 
   const facebook::velox::RowTypePtr outputType_;
-  std::shared_ptr<RowVectorStream> valueStream_;
+  std::unique_ptr<RowVectorStream> valueStream_;
   const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
 };
 
@@ -99,11 +106,14 @@ class ValueStream : public 
facebook::velox::exec::SourceOperator {
             valueStreamNode->outputType(),
             operatorId,
             valueStreamNode->id(),
-            "ValueStream") {
+            valueStreamNode->name().data()) {
     valueStream_ = valueStreamNode->rowVectorStream();
   }
 
   facebook::velox::RowVectorPtr getOutput() override {
+    if (finished_) {
+      return nullptr;
+    }
     if (valueStream_->hasNext()) {
       return valueStream_->next();
     } else {
@@ -122,12 +132,14 @@ class ValueStream : public 
facebook::velox::exec::SourceOperator {
 
  private:
   bool finished_ = false;
-  std::shared_ptr<RowVectorStream> valueStream_;
+  RowVectorStream* valueStream_;
 };
 
 class RowVectorStreamOperatorTranslator : public 
facebook::velox::exec::Operator::PlanNodeTranslator {
-  std::unique_ptr<facebook::velox::exec::Operator>
-  toOperator(facebook::velox::exec::DriverCtx* ctx, int32_t id, const 
facebook::velox::core::PlanNodePtr& node) {
+  std::unique_ptr<facebook::velox::exec::Operator> toOperator(
+      facebook::velox::exec::DriverCtx* ctx,
+      int32_t id,
+      const facebook::velox::core::PlanNodePtr& node) override {
     if (auto valueStreamNode = std::dynamic_pointer_cast<const 
ValueStreamNode>(node)) {
       return std::make_unique<ValueStream>(id, ctx, valueStreamNode);
     }
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 34710c35a..7b41f7071 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -18,6 +18,7 @@
 #include "SubstraitToVeloxPlan.h"
 #include "TypeUtils.h"
 #include "VariantToVectorConverter.h"
+#include "operators/plannodes/RowVectorStream.h"
 #include "velox/connectors/hive/HiveDataSink.h"
 #include "velox/exec/TableWriter.h"
 #include "velox/type/Filter.h"
@@ -1107,7 +1108,13 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructValueStreamNode(
   }
 
   auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
-  auto node = valueStreamNodeFactory_(nextPlanNodeId(), pool_, streamIdx, 
outputType);
+  std::shared_ptr<ResultIterator> iterator;
+  if (!validationMode_) {
+    VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index 
{} in input iterator list.", streamIdx);
+    iterator = inputIters_[streamIdx];
+  }
+  auto valueStream = std::make_unique<RowVectorStream>(pool_, iterator, 
outputType);
+  auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, 
std::move(valueStream));
 
   auto splitInfo = std::make_shared<SplitInfo>();
   splitInfo->isStream = true;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 1f2f39f51..0e892469d 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -165,6 +165,10 @@ class SubstraitToVeloxPlanConverter {
     valueStreamNodeFactory_ = std::move(factory);
   }
 
+  void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
+    inputIters_ = std::move(inputIters);
+  }
+
   /// Used to check if ReadRel specifies an input of stream.
   /// If yes, the index of input stream will be returned.
   /// If not, -1 will be returned.
@@ -591,6 +595,8 @@ class SubstraitToVeloxPlanConverter {
 
   std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t, 
RowTypePtr)> valueStreamNodeFactory_;
 
+  std::vector<std::shared_ptr<ResultIterator>> inputIters_;
+
   /// The map storing the pre-built plan nodes which can be accessed through
   /// index. This map is only used when the computation of a Substrait plan
   /// depends on other input nodes.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to