This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 531e82a5b [VL] Reduce memory waste in sort based shuffle (#6727)
531e82a5b is described below
commit 531e82a5bb7c93603661f4e0c5d98317d58cf109
Author: Rong Ma <[email protected]>
AuthorDate: Thu Aug 8 09:27:17 2024 +0800
[VL] Reduce memory waste in sort based shuffle (#6727)
---
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 52 ++++++++++++++++++-----------
cpp/velox/shuffle/VeloxSortShuffleWriter.h | 4 +--
2 files changed, 35 insertions(+), 21 deletions(-)
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index c0d9b467d..d15280c0a 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -105,7 +105,7 @@ arrow::Status VeloxSortShuffleWriter::init() {
ARROW_RETURN_IF(
options_.partitioning == Partitioning::kSingle,
arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single
partition."));
- initArray();
+ allocateMinimalArray();
sortedBuffer_ =
facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize,
veloxPool_.get());
rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
return arrow::Status::OK();
@@ -260,7 +260,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
pageCursor_ = 0;
// Reset and reallocate array_ to minimal size. Allocate array_ can
trigger spill.
- initArray();
+ allocateMinimalArray();
}
return arrow::Status::OK();
}
@@ -316,21 +316,34 @@ uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t
offset, uint32_t rows)
}
void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t
minSizeRequired) {
- auto size = std::max(std::min<uint64_t>(memLimit >> 2, 64UL * 1024 * 1024),
minSizeRequired);
+ DLOG_IF(INFO, !pages_.empty()) << "Acquire new buffer. current capacity: "
<< pages_.back()->capacity()
+ << ", size: " << pages_.back()->size() << ",
pageCursor: " << pageCursor_
+ << ", unused: " << pages_.back()->capacity()
- pageCursor_;
+ auto size = std::max(
+ std::min<uint64_t>(
+ std::max<uint64_t>(memLimit >> 2,
facebook::velox::AlignedBuffer::kPaddedSize), 64UL * 1024 * 1024) -
+ facebook::velox::AlignedBuffer::kPaddedSize,
+ minSizeRequired);
// Allocating new buffer can trigger spill.
- auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size,
veloxPool_.get(), 0);
+ auto newBuffer = facebook::velox::AlignedBuffer::allocate<char>(size,
veloxPool_.get());
+ DLOG(INFO) << "Allocated new buffer. capacity: " << newBuffer->capacity() <<
", size: " << newBuffer->size();
+ auto newBufferSize = newBuffer->capacity();
+ newBuffer->setSize(newBufferSize);
+
+ currentPage_ = newBuffer->asMutable<char>();
+ currenPageSize_ = newBufferSize;
+ memset(currentPage_, 0, newBufferSize);
+
// If spill triggered, clear pages_.
if (offset_ == 0 && pages_.size() > 0) {
pageAddresses_.clear();
pages_.clear();
}
- currentPage_ = newBuffer->asMutable<char>();
pageAddresses_.emplace_back(currentPage_);
pages_.emplace_back(std::move(newBuffer));
pageCursor_ = 0;
pageNumber_ = pages_.size() - 1;
- currenPageSize_ = pages_.back()->size();
}
void VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
@@ -339,16 +352,12 @@ void
VeloxSortShuffleWriter::growArrayIfNecessary(uint32_t rows) {
// May trigger spill.
auto newSizeBytes = newSize * sizeof(uint64_t);
auto newArray =
facebook::velox::AlignedBuffer::allocate<char>(newSizeBytes, veloxPool_.get());
- // Check if already satisfies.
+ // Check if already satisfies (spill has been triggered).
if (newArraySize(rows) > arraySize_) {
- auto newPtr = newArray->asMutable<uint64_t>();
if (offset_ > 0) {
- gluten::fastCopy(newPtr, arrayPtr_, offset_ * sizeof(uint64_t));
+ gluten::fastCopy(newArray->asMutable<void>(), arrayPtr_, offset_ *
sizeof(uint64_t));
}
- arraySize_ = newSize;
- arrayPtr_ = newPtr;
- array_.reset();
- array_.swap(newArray);
+ setUpArray(std::move(newArray));
}
}
}
@@ -363,9 +372,13 @@ uint32_t VeloxSortShuffleWriter::newArraySize(uint32_t
rows) {
return newSize;
}
-void VeloxSortShuffleWriter::initArray() {
- arraySize_ = options_.sortBufferInitialSize;
- array_ = facebook::velox::AlignedBuffer::allocate<char>(arraySize_ *
sizeof(uint64_t), veloxPool_.get());
+void VeloxSortShuffleWriter::setUpArray(facebook::velox::BufferPtr&& array) {
+ array_.reset();
+ array_ = std::move(array);
+ // Capacity is a multiple of 8 (bytes).
+ auto capacity = array_->capacity() & 0xfffffff8;
+ array_->setSize(capacity);
+ arraySize_ = capacity >> 3;
arrayPtr_ = array_->asMutable<uint64_t>();
}
@@ -381,8 +394,9 @@ int64_t VeloxSortShuffleWriter::totalC2RTime() const {
return c2rTime_;
}
-int VeloxSortShuffleWriter::compare(const void* a, const void* b) {
- // No same values.
- return *(uint64_t*)a > *(uint64_t*)b ? 1 : -1;
+void VeloxSortShuffleWriter::allocateMinimalArray() {
+ auto array = facebook::velox::AlignedBuffer::allocate<char>(
+ options_.sortBufferInitialSize * sizeof(uint64_t), veloxPool_.get());
+ setUpArray(std::move(array));
}
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 69b8b2503..2925b85f8 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -85,9 +85,9 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
uint32_t newArraySize(uint32_t rows);
- void initArray();
+ void setUpArray(facebook::velox::BufferPtr&& array);
- static int compare(const void* a, const void* b);
+ void allocateMinimalArray();
// Stores compact row id -> row
facebook::velox::BufferPtr array_;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]