This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ec3e92ec8 [VL] Optimize the performance of hash based shuffle by
accumulating batches
ec3e92ec8 is described below
commit ec3e92ec841ef71490c8c41c64647c60888e3885
Author: YunDa <[email protected]>
AuthorDate: Tue Jun 11 09:51:09 2024 +0800
[VL] Optimize the performance of hash based shuffle by accumulating batches
---
cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc | 16 +++++++++++++++-
cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h | 9 +++++++++
cpp/velox/shuffle/VeloxShuffleWriter.h | 4 ++++
3 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
index 741ca8ab9..cc648cf7f 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
@@ -303,7 +303,17 @@ arrow::Status
VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
numRows -= length;
} while (numRows);
} else {
- RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit));
+ if (accumulateRows_ + rv->size() < 8192) {
+ accumulateRows_ += rv->size();
+ initAccumulateDataset(rv);
+ accumulateDataset_->append(rv.get());
+ } else {
+ initAccumulateDataset(rv);
+ accumulateDataset_->append(rv.get());
+ RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_),
memLimit));
+ accumulateDataset_ = nullptr;
+ accumulateRows_ = 0;
+ }
}
}
return arrow::Status::OK();
@@ -329,6 +339,10 @@ arrow::Status
VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
}
arrow::Status VeloxHashBasedShuffleWriter::stop() {
+ if (accumulateDataset_ != nullptr) {
+ RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_),
kMinMemLimit));
+ accumulateRows_ = 0;
+ }
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
index a11f84e95..142c7978b 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
@@ -303,6 +303,15 @@ class VeloxHashBasedShuffleWriter : public
VeloxShuffleWriter {
arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv,
int64_t memLimit);
+ void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) {
+ if (accumulateDataset_) {
+ return;
+ }
+ std::vector<facebook::velox::VectorPtr> children(rv->children().size(),
nullptr);
+ accumulateDataset_ =
+ std::make_shared<facebook::velox::RowVector>(veloxPool_.get(),
rv->type(), nullptr, 0, std::move(children));
+ }
+
BinaryArrayResizeState binaryArrayResizeState_{};
bool hasComplexType_ = false;
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index 104b87616..2855831c5 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -124,6 +124,10 @@ class VeloxShuffleWriter : public ShuffleWriter {
int32_t maxBatchSize_{0};
+ uint32_t accumulateRows_{0};
+
+ facebook::velox::RowVectorPtr accumulateDataset_;
+
enum EvictState { kEvictable, kUnevictable };
// stat
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]