This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 531e82a5b [VL] Reduce memory waste in sort based shuffle (#6727)
531e82a5b is described below

commit 531e82a5bb7c93603661f4e0c5d98317d58cf109
Author: Rong Ma <[email protected]>
AuthorDate: Thu Aug 8 09:27:17 2024 +0800

    [VL] Reduce memory waste in sort based shuffle (#6727)
---
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 52 ++++++++++++++++++-----------
 cpp/velox/shuffle/VeloxSortShuffleWriter.h  |  4 +--
 2 files changed, 35 insertions(+), 21 deletions(-)

diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index c0d9b467d..d15280c0a 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -105,7 +105,7 @@ arrow::Status VeloxSortShuffleWriter::init() {
   ARROW_RETURN_IF(
       options_.partitioning == Partitioning::kSingle,
       arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single 
partition."));
-  initArray();
+  allocateMinimalArray();
   sortedBuffer_ = 
facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, 
veloxPool_.get());
   rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
   return arrow::Status::OK();
@@ -260,7 +260,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
     pageCursor_ = 0;
 
     // Reset and reallocate array_ to minimal size. Allocate array_ can 
trigger spill.
-    initArray();
+    allocateMinimalArray();
   }
   return arrow::Status::OK();
 }
@@ -316,21 +316,34 @@ uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t 
offset, uint32_t rows)
 }
 
 void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t 
minSizeRequired) {
-  auto size = std::max(std::min<uint64_t>(memLimit >> 2, 64UL * 1024 * 1024), 
minSizeRequired);
+  DLOG_IF(INFO, !pages_.empty()) << "Acquire new buffer. current capacity: " 
<< pages_.back()->capacity()
+                                 << ", size: " << pages_.back()->size() << ", 
pageCursor: " << pageCursor_
+                                 << ", unused: " << pages_.back()->capacity() 
- pageCursor_;
+  auto size = std::max(
+      std::min<uint64_t>(
+          std::max<uint64_t>(memLimit >> 2, 
facebook::velox::AlignedBuffer::kPaddedSize), 64UL * 1024 * 1024) -
+          facebook::velox::AlignedBuffer::kPaddedSize,
+      minSizeRequired);
   // Allocating new buffer can trigger spill.
-  auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size, 
veloxPool_.get(), 0);
+  auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size, 
veloxPool_.get());
+  DLOG(INFO) << "Allocated new buffer. capacity: " << newBuffer->capacity() << 
", size: " << newBuffer->size();
+  auto newBufferSize = newBuffer->capacity();
+  newBuffer->setSize(newBufferSize);
+
+  currentPage_ = newBuffer->asMutable<char>();
+  currenPageSize_ = newBufferSize;
+  memset(currentPage_, 0, newBufferSize);
+
   // If spill triggered, clear pages_.
   if (offset_ == 0 && pages_.size() > 0) {
     pageAddresses_.clear();
     pages_.clear();
   }
-  currentPage_ = newBuffer->asMutable<char>();
   pageAddresses_.emplace_back(currentPage_);
   pages_.emplace_back(std::move(newBuffer));
 
   pageCursor_ = 0;
   pageNumber_ = pages_.size() - 1;
-  currenPageSize_ = pages_.back()->size();
 }
 
 void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
@@ -339,16 +352,12 @@ void 
VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
     // May trigger spill.
     auto newSizeBytes = newSize * sizeof(uint64_t);
     auto newArray = 
facebook::velox::AlignedBuffer::allocate<char>(newSizeBytes, veloxPool_.get());
-    // Check if already satisfies.
+    // Check if already satisfies (spill has been triggered).
     if (newArraySize(rows) > arraySize_) {
-      auto newPtr = newArray->asMutable<uint64_t>();
       if (offset_ > 0) {
-        gluten::fastCopy(newPtr, arrayPtr_, offset_ * sizeof(uint64_t));
+        gluten::fastCopy(newArray->asMutable<void>(), arrayPtr_, offset_ * 
sizeof(uint64_t));
       }
-      arraySize_ = newSize;
-      arrayPtr_ = newPtr;
-      array_.reset();
-      array_.swap(newArray);
+      setUpArray(std::move(newArray));
     }
   }
 }
@@ -363,9 +372,13 @@ uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t 
rows) {
   return newSize;
 }
 
-void VeloxSortShuffleWriter::initArray() {
-  arraySize_ = options_.sortBufferInitialSize;
-  array_ = facebook::velox::AlignedBuffer::allocate<char>(arraySize_ * 
sizeof(uint64_t), veloxPool_.get());
+void VeloxSortShuffleWriter::setUpArray(facebook::velox::BufferPtr&& array) {
+  array_.reset();
+  array_ = std::move(array);
+  // Capacity is a multiple of 8 (bytes).
+  auto capacity = array_->capacity() & 0xfffffff8;
+  array_->setSize(capacity);
+  arraySize_ = capacity >> 3;
   arrayPtr_ = array_->asMutable<uint64_t>();
 }
 
@@ -381,8 +394,9 @@ int64_t VeloxSortShuffleWriter::totalC2RTime() const {
   return c2rTime_;
 }
 
-int VeloxSortShuffleWriter::compare(const void* a, const void* b) {
-  // No same values.
-  return *(uint64_t*)a > *(uint64_t*)b ? 1 : -1;
+void VeloxSortShuffleWriter::allocateMinimalArray() {
+  auto array = facebook::velox::AlignedBuffer::allocate<char>(
+      options_.sortBufferInitialSize * sizeof(uint64_t), veloxPool_.get());
+  setUpArray(std::move(array));
 }
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 69b8b2503..2925b85f8 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -85,9 +85,9 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   uint32_t newArraySize(uint32_t rows);
 
-  void initArray();
+  void setUpArray(facebook::velox::BufferPtr&& array);
 
-  static int compare(const void* a, const void* b);
+  void allocateMinimalArray();
 
   // Stores compact row id -> row
   facebook::velox::BufferPtr array_;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to