This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7467fe427 [VL] Minor refactor for ValueStream node construction and
usage (#6382)
7467fe427 is described below
commit 7467fe427159bd757d2eb911b0b6ea1da4928c19
Author: Yang Zhang <[email protected]>
AuthorDate: Wed Jul 10 15:32:37 2024 +0800
[VL] Minor refactor for ValueStream node construction and usage (#6382)
---
cpp/velox/compute/VeloxPlanConverter.cc | 13 +---------
cpp/velox/operators/plannodes/RowVectorStream.h | 32 +++++++++++++++++--------
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 9 ++++++-
cpp/velox/substrait/SubstraitToVeloxPlan.h | 6 +++++
4 files changed, 37 insertions(+), 23 deletions(-)
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index 315ff2da6..ed2545c78 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -36,18 +36,7 @@ VeloxPlanConverter::VeloxPlanConverter(
bool validationMode)
: validationMode_(validationMode),
substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath,
validationMode) {
- // avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause
redefinition of array abi.h.
- auto factory = [inputIters = std::move(inputIters), validationMode =
validationMode](
- std::string nodeId, memory::MemoryPool* pool, int32_t
streamIdx, RowTypePtr outputType) {
- std::shared_ptr<ResultIterator> iterator;
- if (!validationMode) {
- VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream
index {} in input iterator list.", streamIdx);
- iterator = inputIters[streamIdx];
- }
- auto valueStream = std::make_shared<RowVectorStream>(pool, iterator,
outputType);
- return std::make_shared<ValueStreamNode>(nodeId, outputType,
std::move(valueStream));
- };
- substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory));
+ substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
}
namespace {
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h
b/cpp/velox/operators/plannodes/RowVectorStream.h
index e02b288c4..c72e9137f 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -32,11 +32,17 @@ class RowVectorStream {
: iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}
bool hasNext() {
- return iterator_->hasNext();
+ if (!finished_) {
+ finished_ = !iterator_->hasNext();
+ }
+ return !finished_;
}
// Convert arrow batch to rowvector and use new output columns
facebook::velox::RowVectorPtr next() {
+ if (finished_) {
+ return nullptr;
+ }
const std::shared_ptr<VeloxColumnarBatch>& vb =
VeloxColumnarBatch::from(pool_, iterator_->next());
auto vp = vb->getRowVector();
VELOX_DCHECK(vp != nullptr);
@@ -45,17 +51,18 @@ class RowVectorStream {
}
private:
+ bool finished_{false};
std::shared_ptr<ResultIterator> iterator_;
const facebook::velox::RowTypePtr outputType_;
facebook::velox::memory::MemoryPool* pool_;
};
-class ValueStreamNode : public facebook::velox::core::PlanNode {
+class ValueStreamNode final : public facebook::velox::core::PlanNode {
public:
ValueStreamNode(
const facebook::velox::core::PlanNodeId& id,
const facebook::velox::RowTypePtr& outputType,
- std::shared_ptr<RowVectorStream> valueStream)
+ std::unique_ptr<RowVectorStream> valueStream)
: facebook::velox::core::PlanNode(id), outputType_(outputType),
valueStream_(std::move(valueStream)) {
VELOX_CHECK_NOT_NULL(valueStream_);
}
@@ -68,8 +75,8 @@ class ValueStreamNode : public
facebook::velox::core::PlanNode {
return kEmptySources;
};
- const std::shared_ptr<RowVectorStream>& rowVectorStream() const {
- return valueStream_;
+ RowVectorStream* rowVectorStream() const {
+ return valueStream_.get();
}
std::string_view name() const override {
@@ -84,7 +91,7 @@ class ValueStreamNode : public
facebook::velox::core::PlanNode {
void addDetails(std::stringstream& stream) const override{};
const facebook::velox::RowTypePtr outputType_;
- std::shared_ptr<RowVectorStream> valueStream_;
+ std::unique_ptr<RowVectorStream> valueStream_;
const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
};
@@ -99,11 +106,14 @@ class ValueStream : public
facebook::velox::exec::SourceOperator {
valueStreamNode->outputType(),
operatorId,
valueStreamNode->id(),
- "ValueStream") {
+ valueStreamNode->name().data()) {
valueStream_ = valueStreamNode->rowVectorStream();
}
facebook::velox::RowVectorPtr getOutput() override {
+ if (finished_) {
+ return nullptr;
+ }
if (valueStream_->hasNext()) {
return valueStream_->next();
} else {
@@ -122,12 +132,14 @@ class ValueStream : public
facebook::velox::exec::SourceOperator {
private:
bool finished_ = false;
- std::shared_ptr<RowVectorStream> valueStream_;
+ RowVectorStream* valueStream_;
};
class RowVectorStreamOperatorTranslator : public
facebook::velox::exec::Operator::PlanNodeTranslator {
- std::unique_ptr<facebook::velox::exec::Operator>
- toOperator(facebook::velox::exec::DriverCtx* ctx, int32_t id, const
facebook::velox::core::PlanNodePtr& node) {
+ std::unique_ptr<facebook::velox::exec::Operator> toOperator(
+ facebook::velox::exec::DriverCtx* ctx,
+ int32_t id,
+ const facebook::velox::core::PlanNodePtr& node) override {
if (auto valueStreamNode = std::dynamic_pointer_cast<const
ValueStreamNode>(node)) {
return std::make_unique<ValueStream>(id, ctx, valueStreamNode);
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 34710c35a..7b41f7071 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -18,6 +18,7 @@
#include "SubstraitToVeloxPlan.h"
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
+#include "operators/plannodes/RowVectorStream.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
#include "velox/type/Filter.h"
@@ -1107,7 +1108,13 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructValueStreamNode(
}
auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
- auto node = valueStreamNodeFactory_(nextPlanNodeId(), pool_, streamIdx,
outputType);
+ std::shared_ptr<ResultIterator> iterator;
+ if (!validationMode_) {
+ VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index
{} in input iterator list.", streamIdx);
+ iterator = inputIters_[streamIdx];
+ }
+ auto valueStream = std::make_unique<RowVectorStream>(pool_, iterator,
outputType);
+ auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType,
std::move(valueStream));
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 1f2f39f51..0e892469d 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -165,6 +165,10 @@ class SubstraitToVeloxPlanConverter {
valueStreamNodeFactory_ = std::move(factory);
}
+ void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
+ inputIters_ = std::move(inputIters);
+ }
+
/// Used to check if ReadRel specifies an input of stream.
/// If yes, the index of input stream will be returned.
/// If not, -1 will be returned.
@@ -591,6 +595,8 @@ class SubstraitToVeloxPlanConverter {
std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t,
RowTypePtr)> valueStreamNodeFactory_;
+ std::vector<std::shared_ptr<ResultIterator>> inputIters_;
+
/// The map storing the pre-built plan nodes which can be accessed through
/// index. This map is only used when the computation of a Substrait plan
/// depends on other input nodes.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]