This is an automated email from the ASF dual-hosted git repository.

weitingchen pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/branch-1.2 by this push:
     new 773c9b474b [VL][1.2] Port Fix shuffle spill triggered by evicting 
buffers during stop (#6422) (#7991)
773c9b474b is described below

commit 773c9b474b53f5054e107249192f6a6ba9c45d9c
Author: zhaokuo <[email protected]>
AuthorDate: Fri Nov 22 18:45:44 2024 +0800

    [VL][1.2] Port Fix shuffle spill triggered by evicting buffers during stop 
(#6422) (#7991)
    
    In VeloxHashBasedShuffleWriter::stop, evict partition buffers can 
allocating extra memory and triggers spill. In this case the target partition 
buffer should not get shrunk.
    
    Added UT for this case.
    
    Co-authored-by: Rong Ma <[email protected]>
---
 cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc   | 16 ++++--
 cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h    | 19 ++++++-
 cpp/velox/tests/VeloxShuffleWriterTest.cc          | 66 ++++++++--------------
 cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 17 ++++++
 4 files changed, 70 insertions(+), 48 deletions(-)

diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
index 3bd1a2fbc6..f9cd2780c3 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
@@ -329,8 +329,10 @@ arrow::Status 
VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
 }
 
 arrow::Status VeloxHashBasedShuffleWriter::stop() {
+  setSplitState(SplitState::kStopEvict);
   if (options_.partitioning != Partitioning::kSingle) {
     for (auto pid = 0; pid < numPartitions_; ++pid) {
+      PartitionBufferGuard guard(partitionBufferInUse_, pid);
       RETURN_NOT_OK(evictPartitionBuffers(pid, false));
     }
   }
@@ -970,10 +972,6 @@ arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
VeloxHashBasedShuffle
     bool reuseBuffers) {
   SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]);
 
-  if (partitionBufferBase_[partitionId] == 0) {
-    return std::vector<std::shared_ptr<arrow::Buffer>>{};
-  }
-
   auto numRows = partitionBufferBase_[partitionId];
   auto fixedWidthIdx = 0;
   auto binaryIdx = 0;
@@ -1321,6 +1319,9 @@ arrow::Result<int64_t> 
VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz
   // Sort partition buffers by (partitionBufferSize_ - partitionBufferBase_)
   std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
   for (auto pid = 0; pid < numPartitions_; ++pid) {
+    if (partitionBufferInUse_.has_value() && *partitionBufferInUse_ == pid) {
+      continue;
+    }
     if (partitionBufferSize_[pid] > 0 && partitionBufferSize_[pid] > 
partitionBufferBase_[pid]) {
       pidToSize.emplace_back(pid, partitionBufferSize_[pid] - 
partitionBufferBase_[pid]);
     }
@@ -1348,6 +1349,7 @@ arrow::Result<int64_t> 
VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz
 arrow::Result<int64_t> 
VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
   // Evict partition buffers, only when splitState_ == SplitState::kInit, and 
space freed from
   // shrinking is not enough. In this case partitionBufferSize_ == 
partitionBufferBase_
+  VELOX_CHECK(!partitionBufferInUse_);
   int64_t beforeEvict = partitionBufferPool_->bytes_allocated();
   int64_t evicted = 0;
   std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
@@ -1375,10 +1377,12 @@ arrow::Result<int64_t> 
VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize
 bool VeloxHashBasedShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
   // If OOM happens during SplitState::kSplit, it is triggered by binary 
buffers resize.
   // Or during SplitState::kInit, it is triggered by other operators.
+  // Or during SplitState::kStopEvict, it is triggered by assembleBuffers 
allocating extra memory. In this case we use
+  // PartitionBufferGuard to prevent the target partition from being shrunk.
   // The reclaim order is spill->shrink, because the partition buffers can be 
reused.
   // SinglePartitioning doesn't maintain partition buffers.
   return options_.partitioning != Partitioning::kSingle &&
-      (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit);
+      (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit 
|| splitState_ == SplitState::kStopEvict);
 }
 
 bool VeloxHashBasedShuffleWriter::evictPartitionBuffersAfterSpill() const {
@@ -1391,7 +1395,7 @@ arrow::Result<uint32_t> 
VeloxHashBasedShuffleWriter::partitionBufferSizeAfterShr
   if (splitState_ == SplitState::kSplit) {
     return partitionBufferBase_[partitionId] + 
partition2RowCount_[partitionId];
   }
-  if (splitState_ == kInit || splitState_ == SplitState::kStop) {
+  if (splitState_ == kInit || splitState_ == SplitState::kStopEvict) {
     return partitionBufferBase_[partitionId];
   }
   return arrow::Status::Invalid("Cannot shrink partition buffers in 
SplitState: " + std::to_string(splitState_));
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h 
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
index a11f84e952..f78211980a 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
@@ -87,7 +87,7 @@ namespace gluten {
 
 #endif // end of VELOX_SHUFFLE_WRITER_PRINT
 
-enum SplitState { kInit, kPreAlloc, kSplit, kStop };
+enum SplitState { kInit, kPreAlloc, kSplit, kStopEvict, kStop };
 
 struct BinaryArrayResizeState {
   bool inResize;
@@ -303,6 +303,21 @@ class VeloxHashBasedShuffleWriter : public 
VeloxShuffleWriter {
 
   arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, 
int64_t memLimit);
 
+  class PartitionBufferGuard {
+   public:
+    PartitionBufferGuard(std::optional<uint32_t>& partitionInUse, uint32_t 
partitionId)
+        : partitionBufferInUse_(partitionInUse) {
+      partitionBufferInUse_ = partitionId;
+    }
+
+    ~PartitionBufferGuard() {
+      partitionBufferInUse_ = std::nullopt;
+    }
+
+   private:
+    std::optional<uint32_t>& partitionBufferInUse_;
+  };
+
   BinaryArrayResizeState binaryArrayResizeState_{};
 
   bool hasComplexType_ = false;
@@ -401,6 +416,8 @@ class VeloxHashBasedShuffleWriter : public 
VeloxShuffleWriter {
   facebook::velox::serializer::presto::PrestoVectorSerde serde_;
 
   SplitState splitState_{kInit};
+
+  std::optional<uint32_t> partitionBufferInUse_{std::nullopt};
 }; // class VeloxHashBasedShuffleWriter
 
 } // namespace gluten
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 1c1be6fc1b..957657eeb5 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -207,50 +207,13 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
 TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
-  std::vector<VectorPtr> children = {
-      makeNullableFlatVector<int32_t>({std::nullopt, 1}),
-      makeRowVector({
-          makeFlatVector<int32_t>({1, 3}),
-          makeNullableFlatVector<velox::StringView>({std::nullopt, "de"}),
-      }),
-      makeNullableFlatVector<StringView>({std::nullopt, "10 I'm not inline 
string"}),
-      makeArrayVector<int64_t>({
-          {1, 2, 3, 4, 5},
-          {1, 2, 3},
-      }),
-      makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}}, 
{{3, "str3000"}, {4, "str4000"}}}),
-  };
-  auto dataVector = makeRowVector(children);
+  auto children = childrenComplex_;
   children.insert((children.begin()), makeFlatVector<int32_t>({1, 2}));
   auto vector = makeRowVector(children);
+  auto firstBlock = takeRows({inputVectorComplex_}, {{1}});
+  auto secondBlock = takeRows({inputVectorComplex_}, {{0}});
 
-  auto firstBlock = makeRowVector({
-      makeConstant<int32_t>(1, 1),
-      makeRowVector({
-          makeConstant<int32_t>(3, 1),
-          makeFlatVector<velox::StringView>({"de"}),
-      }),
-      makeFlatVector<StringView>({"10 I'm not inline string"}),
-      makeArrayVector<int64_t>({
-          {1, 2, 3},
-      }),
-      makeMapVector<int32_t, StringView>({{{3, "str3000"}, {4, "str4000"}}}),
-  });
-
-  auto secondBlock = makeRowVector({
-      makeNullConstant(TypeKind::INTEGER, 1),
-      makeRowVector({
-          makeConstant<int32_t>(1, 1),
-          makeNullableFlatVector<velox::StringView>({std::nullopt}),
-      }),
-      makeNullableFlatVector<StringView>({std::nullopt}),
-      makeArrayVector<int64_t>({
-          {1, 2, 3, 4, 5},
-      }),
-      makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}}}),
-  });
-
-  testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, dataVector->type(), 
{{firstBlock}, {secondBlock}});
+  testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, 
inputVectorComplex_->type(), {{firstBlock}, {secondBlock}});
 }
 
 TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
@@ -631,6 +594,27 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
   }
 }
 
+TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
+  ASSERT_NOT_OK(initShuffleWriterOptions());
+  shuffleWriterOptions_.bufferSize = 4;
+  auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
+  auto shuffleWriter = createShuffleWriter(&pool);
+
+  pool.setEvictable(shuffleWriter.get());
+  for (int i = 0; i < 3; ++i) {
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
+  }
+
+  // Reclaim from PartitionWriter to free cached bytes.
+  auto payloadSize = shuffleWriter->cachedPayloadSize();
+  int64_t evicted;
+  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
+  ASSERT_EQ(evicted, payloadSize);
+
+  // When evicting partitioning buffers in stop, spill will be triggered by 
complex types allocating extra memory.
+  ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { 
ASSERT_NOT_OK(shuffleWriter->stop()); }));
+}
+
 TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.bufferSize = 4;
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index fd3ae3d547..41b7f31e02 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -172,12 +172,27 @@ class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase
         makeNullableFlatVector<facebook::velox::StringView>(
             std::vector<std::optional<facebook::velox::StringView>>(2048, 
std::nullopt)),
     };
+    childrenComplex_ = {
+        makeNullableFlatVector<int32_t>({std::nullopt, 1}),
+        makeRowVector({
+            makeFlatVector<int32_t>({1, 3}),
+            makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, 
"de"}),
+        }),
+        makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "10 
I'm not inline string"}),
+        makeArrayVector<int64_t>({
+            {1, 2, 3, 4, 5},
+            {1, 2, 3},
+        }),
+        makeMapVector<int32_t, facebook::velox::StringView>(
+            {{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4, 
"str4000"}}}),
+    };
 
     inputVector1_ = makeRowVector(children1_);
     inputVector2_ = makeRowVector(children2_);
     inputVectorNoNull_ = makeRowVector(childrenNoNull_);
     inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
     inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
+    inputVectorComplex_ = makeRowVector(childrenComplex_);
   }
 
   arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, 
facebook::velox::RowVectorPtr vector) {
@@ -217,6 +232,7 @@ class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase
   std::vector<facebook::velox::VectorPtr> childrenNoNull_;
   std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
   std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;
+  std::vector<facebook::velox::VectorPtr> childrenComplex_;
 
   facebook::velox::RowVectorPtr inputVector1_;
   facebook::velox::RowVectorPtr inputVector2_;
@@ -225,6 +241,7 @@ class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase
   std::string largeString2_;
   facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
   facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
+  facebook::velox::RowVectorPtr inputVectorComplex_;
 };
 
 class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to