This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 68651e723 [GLUTEN-6822][VL] Fix wrong maxRowsToInsert and sort time 
metrics (#6832)
68651e723 is described below

commit 68651e723440a092760b64e144736317d3c15e2c
Author: Rong Ma <[email protected]>
AuthorDate: Fri Aug 16 13:14:54 2024 +0800

    [GLUTEN-6822][VL] Fix wrong maxRowsToInsert and sort time metrics (#6832)
---
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc        | 36 ++++++++++++---------
 cpp/velox/shuffle/VeloxSortShuffleWriter.h         |  4 ++-
 cpp/velox/tests/VeloxShuffleWriterTest.cc          | 37 +++++++++++++++-------
 cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h |  9 ++++--
 4 files changed, 56 insertions(+), 30 deletions(-)

diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 7f3e9201f..2bfc4908d 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -266,9 +266,10 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() 
{
 }
 
 arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, 
size_t begin, size_t end) {
-  ScopedTimer timer(&sortTime_);
+  // Count copy row time into sortTime_.
+  Timer sortTime{};
   // Serialize [begin, end)
-  uint64_t offset = 0;
+  int64_t offset = 0;
   char* addr;
   uint32_t size;
 
@@ -278,14 +279,9 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
     addr = pageAddresses_[pageIndex.first] + pageIndex.second;
     size = *(RowSizeType*)addr;
     if (offset + size > kSortedBufferSize) {
-      VELOX_CHECK(offset > 0);
-      auto payload = std::make_unique<InMemoryPayload>(
-          index - begin,
-          nullptr,
-          
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
 offset)});
-      updateSpillMetrics(payload);
-      RETURN_NOT_OK(
-          partitionWriter_->evict(partitionId, std::move(payload), 
Evict::type::kSortSpill, false, false, stopped_));
+      sortTime.stop();
+      RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));
+      sortTime.start();
       begin = index;
       offset = 0;
     }
@@ -293,27 +289,37 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
     offset += size;
     index++;
   }
+  sortTime.stop();
+  RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));
+
+  sortTime_ += sortTime.realTimeUsed();
+  return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, 
uint32_t numRows, int64_t rawLength) {
+  VELOX_CHECK(rawLength > 0);
   auto payload = std::make_unique<InMemoryPayload>(
-      end - begin,
+      numRows,
       nullptr,
-      
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
 offset)});
+      
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
 rawLength)});
   updateSpillMetrics(payload);
   RETURN_NOT_OK(
       partitionWriter_->evict(partitionId, std::move(payload), 
Evict::type::kSortSpill, false, false, stopped_));
   return arrow::Status::OK();
 }
 
-uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t 
rows) {
+uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t 
remainingRows) {
   // Check how many rows can be handled.
   if (pages_.empty()) {
     return 0;
   }
   auto remainingBytes = pages_.back()->size() - pageCursor_;
   if (fixedRowSize_) {
-    return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), 
rows);
+    return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), 
remainingRows);
   }
   auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
-  auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), 
remainingBytes);
+  auto bytesWritten = rowSizePrefixSum_[offset];
+  auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), 
remainingBytes + bytesWritten);
   return iter - beginIter;
 }
 
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 34fbfd243..1626573a7 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -77,7 +77,9 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);
 
-  uint32_t maxRowsToInsert(uint32_t offset, uint32_t rows);
+  arrow::Status evictPartition0(uint32_t partitionId, uint32_t numRows, 
int64_t rawLength);
+
+  uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows);
 
   void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired);
 
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index af9d5a58d..7cbfbcd79 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -407,7 +407,22 @@ TEST_P(RoundRobinPartitioningShuffleWriter, 
spillVerifyResult) {
   shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), 
{{blockPid1}, {blockPid2}});
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) {
+TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
+  if (GetParam().shuffleWriterType != kSortShuffle) {
+    return;
+  }
+  ASSERT_NOT_OK(initShuffleWriterOptions());
+  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+
+  // Set memLimit to 0 to force allocate a new buffer for each row.
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
+
+  auto blockPid1 = takeRows({inputVector1_}, {{0, 2, 4, 6, 8}});
+  auto blockPid2 = takeRows({inputVector1_}, {{1, 3, 5, 7, 9}});
+  shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), 
{{blockPid1}, {blockPid2}});
+}
+
+TEST_F(VeloxHashShuffleWriterMemoryTest, memoryLeak) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   std::shared_ptr<arrow::MemoryPool> pool = 
std::make_shared<LimitedMemoryPool>();
   shuffleWriterOptions_.bufferSize = 4;
@@ -425,7 +440,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, memoryLeak) {
   ASSERT_TRUE(pool->bytes_allocated() == 0);
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   std::shared_ptr<arrow::MemoryPool> pool = 
std::make_shared<LimitedMemoryPool>(0);
   shuffleWriterOptions_.bufferSize = 4;
@@ -438,7 +453,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, 
spillFailWithOutOfMemory) {
   ASSERT_TRUE(status.IsOutOfMemory());
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, kInit) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kInit) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.bufferSize = 4;
   auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
@@ -508,7 +523,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInit) {
   ASSERT_NOT_OK(shuffleWriter->stop());
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kInitSingle) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.partitioning = Partitioning::kSingle;
   shuffleWriterOptions_.bufferSize = 4;
@@ -530,7 +545,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kInitSingle) {
   ASSERT_NOT_OK(shuffleWriter->stop());
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, kSplit) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kSplit) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.bufferSize = 4;
   auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
@@ -552,7 +567,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplit) {
   ASSERT_NOT_OK(shuffleWriter->stop());
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.partitioning = Partitioning::kSingle;
   auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
@@ -570,7 +585,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kSplitSingle) {
   ASSERT_NOT_OK(shuffleWriter->stop());
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) {
   for (const auto partitioning : {Partitioning::kSingle, 
Partitioning::kRoundRobin}) {
     ASSERT_NOT_OK(initShuffleWriterOptions());
     shuffleWriterOptions_.partitioning = partitioning;
@@ -592,7 +607,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
   }
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.bufferSize = 4;
   auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
@@ -613,7 +628,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
   ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { 
ASSERT_NOT_OK(shuffleWriter->stop()); }));
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, evictPartitionBuffers) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.bufferSize = 4;
   auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
@@ -635,7 +650,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) 
{
   ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, kUnevictableSingle) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.partitioning = Partitioning::kSingle;
   auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get());
@@ -657,7 +672,7 @@ TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
   ASSERT_EQ(evicted, 0);
 }
 
-TEST_F(VeloxShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
+TEST_F(VeloxHashShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.bufferReallocThreshold = 1;
   auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index d32e32721..102c73ca4 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -196,9 +196,12 @@ class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase
     inputVectorComplex_ = makeRowVector(childrenComplex_);
   }
 
-  arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, 
facebook::velox::RowVectorPtr vector) {
+  arrow::Status splitRowVector(
+      VeloxShuffleWriter& shuffleWriter,
+      facebook::velox::RowVectorPtr vector,
+      int64_t memLimit = ShuffleWriter::kMinMemLimit) {
     std::shared_ptr<ColumnarBatch> cb = 
std::make_shared<VeloxColumnarBatch>(vector);
-    return shuffleWriter.write(cb, ShuffleWriter::kMinMemLimit);
+    return shuffleWriter.write(cb, memLimit);
   }
 
   // Create multiple local dirs and join with comma.
@@ -533,7 +536,7 @@ class RoundRobinPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWr
   }
 };
 
-class VeloxShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, public 
testing::Test {
+class VeloxHashShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, 
public testing::Test {
  protected:
   static void SetUpTestCase() {
     facebook::velox::memory::MemoryManager::testingSetInstance({});


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to