This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ec3e92ec8 [VL] Optimize the performance of hash based shuffle by 
accumulating batches
ec3e92ec8 is described below

commit ec3e92ec841ef71490c8c41c64647c60888e3885
Author: YunDa <[email protected]>
AuthorDate: Tue Jun 11 09:51:09 2024 +0800

    [VL] Optimize the performance of hash based shuffle by accumulating batches
---
 cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc | 16 +++++++++++++++-
 cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h  |  9 +++++++++
 cpp/velox/shuffle/VeloxShuffleWriter.h           |  4 ++++
 3 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
index 741ca8ab9..cc648cf7f 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
@@ -303,7 +303,17 @@ arrow::Status 
VeloxHashBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
         numRows -= length;
       } while (numRows);
     } else {
-      RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit));
+      if (accumulateRows_ + rv->size() < 8192) {
+        accumulateRows_ += rv->size();
+        initAccumulateDataset(rv);
+        accumulateDataset_->append(rv.get());
+      } else {
+        initAccumulateDataset(rv);
+        accumulateDataset_->append(rv.get());
+        RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), 
memLimit));
+        accumulateDataset_ = nullptr;
+        accumulateRows_ = 0;
+      }
     }
   }
   return arrow::Status::OK();
@@ -329,6 +339,10 @@ arrow::Status 
VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
 }
 
 arrow::Status VeloxHashBasedShuffleWriter::stop() {
+  if (accumulateDataset_ != nullptr) {
+    RETURN_NOT_OK(partitioningAndDoSplit(std::move(accumulateDataset_), 
kMinMemLimit));
+    accumulateRows_ = 0;
+  }
   if (options_.partitioning != Partitioning::kSingle) {
     for (auto pid = 0; pid < numPartitions_; ++pid) {
       RETURN_NOT_OK(evictPartitionBuffers(pid, false));
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h 
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
index a11f84e95..142c7978b 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
@@ -303,6 +303,15 @@ class VeloxHashBasedShuffleWriter : public 
VeloxShuffleWriter {
 
   arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, 
int64_t memLimit);
 
+  void initAccumulateDataset(facebook::velox::RowVectorPtr& rv) {
+    if (accumulateDataset_) {
+      return;
+    }
+    std::vector<facebook::velox::VectorPtr> children(rv->children().size(), 
nullptr);
+    accumulateDataset_ =
+        std::make_shared<facebook::velox::RowVector>(veloxPool_.get(), 
rv->type(), nullptr, 0, std::move(children));
+  }
+
   BinaryArrayResizeState binaryArrayResizeState_{};
 
   bool hasComplexType_ = false;
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h 
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index 104b87616..2855831c5 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -124,6 +124,10 @@ class VeloxShuffleWriter : public ShuffleWriter {
 
   int32_t maxBatchSize_{0};
 
+  uint32_t accumulateRows_{0};
+
+  facebook::velox::RowVectorPtr accumulateDataset_;
+
   enum EvictState { kEvictable, kUnevictable };
 
   // stat


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to