This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 68651e723 [GLUTEN-6822][VL] Fix wrong maxRowsToInsert and sort time
metrics (#6832)
68651e723 is described below
commit 68651e723440a092760b64e144736317d3c15e2c
Author: Rong Ma <[email protected]>
AuthorDate: Fri Aug 16 13:14:54 2024 +0800
[GLUTEN-6822][VL] Fix wrong maxRowsToInsert and sort time metrics (#6832)
---
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 36 ++++++++++++---------
cpp/velox/shuffle/VeloxSortShuffleWriter.h | 4 ++-
cpp/velox/tests/VeloxShuffleWriterTest.cc | 37 +++++++++++++++-------
cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 9 ++++--
4 files changed, 56 insertions(+), 30 deletions(-)
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 7f3e9201f..2bfc4908d 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -266,9 +266,10 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions()
{
}
arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId,
size_t begin, size_t end) {
- ScopedTimer timer(&sortTime_);
+ // Count copy row time into sortTime_.
+ Timer sortTime{};
// Serialize [begin, end)
- uint64_t offset = 0;
+ int64_t offset = 0;
char* addr;
uint32_t size;
@@ -278,14 +279,9 @@ arrow::Status
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
addr = pageAddresses_[pageIndex.first] + pageIndex.second;
size = *(RowSizeType*)addr;
if (offset + size > kSortedBufferSize) {
- VELOX_CHECK(offset > 0);
- auto payload = std::make_unique<InMemoryPayload>(
- index - begin,
- nullptr,
-
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
offset)});
- updateSpillMetrics(payload);
- RETURN_NOT_OK(
- partitionWriter_->evict(partitionId, std::move(payload),
Evict::type::kSortSpill, false, false, stopped_));
+ sortTime.stop();
+ RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));
+ sortTime.start();
begin = index;
offset = 0;
}
@@ -293,27 +289,37 @@ arrow::Status
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
offset += size;
index++;
}
+ sortTime.stop();
+ RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));
+
+ sortTime_ += sortTime.realTimeUsed();
+ return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId,
uint32_t numRows, int64_t rawLength) {
+ VELOX_CHECK(rawLength > 0);
auto payload = std::make_unique<InMemoryPayload>(
- end - begin,
+ numRows,
nullptr,
-
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
offset)});
+
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
rawLength)});
updateSpillMetrics(payload);
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload),
Evict::type::kSortSpill, false, false, stopped_));
return arrow::Status::OK();
}
-uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t
rows) {
+uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t
remainingRows) {
// Check how many rows can be handled.
if (pages_.empty()) {
return 0;
}
auto remainingBytes = pages_.back()->size() - pageCursor_;
if (fixedRowSize_) {
- return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())),
rows);
+ return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())),
remainingRows);
}
auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
- auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(),
remainingBytes);
+ auto bytesWritten = rowSizePrefixSum_[offset];
+ auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(),
remainingBytes + bytesWritten);
return iter - beginIter;
}
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 34fbfd243..1626573a7 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -77,7 +77,9 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);
- uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows);
+ arrow::Status evictPartition0(uint32_t partitionId, uint32_t numRows,
int64_t rawLength);
+
+ uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows);
void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired);
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index af9d5a58d..7cbfbcd79 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -407,7 +407,22 @@ TEST_P(RoundRobinPartitioningShuffleWriter,
spillVerifyResult) {
shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(),
{{blockPid1}, {blockPid2}});
}
-TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) {
+TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
+ if (GetParam().shuffleWriterType != kSortShuffle) {
+ return;
+ }
+ ASSERT_NOT_OK(initShuffleWriterOptions());
+ auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+
+ // Set memLimit to 0 to force allocate a new buffer for each row.
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
+
+ auto blockPid1 = takeRows({inputVector1_}, {{0, 2, 4, 6, 8}});
+ auto blockPid2 = takeRows({inputVector1_}, {{1, 3, 5, 7, 9}});
+ shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(),
{{blockPid1}, {blockPid2}});
+}
+
+TEST_F(VeloxHashShuffleWriterMemoryTest, memoryLeak) {
ASSERT_NOT_OK(initShuffleWriterOptions());
std::shared_ptr<arrow::MemoryPool> pool =
std::make_shared<LimitedMemoryPool>();
shuffleWriterOptions_.bufferSize = 4;
@@ -425,7 +440,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) {
ASSERT_TRUE(pool->bytes_allocated() == 0);
}
-TEST_F(VeloxShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
ASSERT_NOT_OK(initShuffleWriterOptions());
std::shared_ptr<arrow::MemoryPool> pool =
std::make_shared<LimitedMemoryPool>(0);
shuffleWriterOptions_.bufferSize = 4;
@@ -438,7 +453,7 @@ TEST_F(VeloxShuffleWriterMemoryTest,
spillFailWithOutOfMemory) {
ASSERT_TRUE(status.IsOutOfMemory());
}
-TEST_F(VeloxShuffleWriterMemoryTest, kInit) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kInit) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
@@ -508,7 +523,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInit) {
ASSERT_NOT_OK(shuffleWriter->stop());
}
-TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kInitSingle) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
shuffleWriterOptions_.bufferSize = 4;
@@ -530,7 +545,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) {
ASSERT_NOT_OK(shuffleWriter->stop());
}
-TEST_F(VeloxShuffleWriterMemoryTest, kSplit) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kSplit) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
@@ -552,7 +567,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplit) {
ASSERT_NOT_OK(shuffleWriter->stop());
}
-TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
@@ -570,7 +585,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) {
ASSERT_NOT_OK(shuffleWriter->stop());
}
-TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) {
for (const auto partitioning : {Partitioning::kSingle,
Partitioning::kRoundRobin}) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = partitioning;
@@ -592,7 +607,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
}
}
-TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
@@ -613,7 +628,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
ASSERT_NOT_OK(shuffleWriter->stop()); }));
}
-TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, evictPartitionBuffers) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
@@ -635,7 +650,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers)
{
ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
}
-TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kUnevictableSingle) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get());
@@ -657,7 +672,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
ASSERT_EQ(evicted, 0);
}
-TEST_F(VeloxShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferReallocThreshold = 1;
auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index d32e32721..102c73ca4 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -196,9 +196,12 @@ class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase
inputVectorComplex_ = makeRowVector(childrenComplex_);
}
- arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter,
facebook::velox::RowVectorPtr vector) {
+ arrow::Status splitRowVector(
+ VeloxShuffleWriter& shuffleWriter,
+ facebook::velox::RowVectorPtr vector,
+ int64_t memLimit = ShuffleWriter::kMinMemLimit) {
std::shared_ptr<ColumnarBatch> cb =
std::make_shared<VeloxColumnarBatch>(vector);
- return shuffleWriter.write(cb, ShuffleWriter::kMinMemLimit);
+ return shuffleWriter.write(cb, memLimit);
}
// Create multiple local dirs and join with comma.
@@ -533,7 +536,7 @@ class RoundRobinPartitioningShuffleWriter : public
MultiplePartitioningShuffleWr
}
};
-class VeloxShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, public
testing::Test {
+class VeloxHashShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase,
public testing::Test {
protected:
static void SetUpTestCase() {
facebook::velox::memory::MemoryManager::testingSetInstance({});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]