This is an automated email from the ASF dual-hosted git repository.
weitingchen pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/branch-1.2 by this push:
new 773c9b474b [VL][1.2] Port Fix shuffle spill triggered by evicting
buffers during stop (#6422) (#7991)
773c9b474b is described below
commit 773c9b474b53f5054e107249192f6a6ba9c45d9c
Author: zhaokuo <[email protected]>
AuthorDate: Fri Nov 22 18:45:44 2024 +0800
[VL][1.2] Port Fix shuffle spill triggered by evicting buffers during stop
(#6422) (#7991)
In VeloxHashBasedShuffleWriter::stop, evict partition buffers can
allocating extra memory and triggers spill. In this case the target partition
buffer should not get shrunk.
Added UT for this case.
Co-authored-by: Rong Ma <[email protected]>
---
cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc | 16 ++++--
cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h | 19 ++++++-
cpp/velox/tests/VeloxShuffleWriterTest.cc | 66 ++++++++--------------
cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 17 ++++++
4 files changed, 70 insertions(+), 48 deletions(-)
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
index 3bd1a2fbc6..f9cd2780c3 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.cc
@@ -329,8 +329,10 @@ arrow::Status
VeloxHashBasedShuffleWriter::partitioningAndDoSplit(facebook::velo
}
arrow::Status VeloxHashBasedShuffleWriter::stop() {
+ setSplitState(SplitState::kStopEvict);
if (options_.partitioning != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
+ PartitionBufferGuard guard(partitionBufferInUse_, pid);
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
}
}
@@ -970,10 +972,6 @@ arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>>
VeloxHashBasedShuffle
bool reuseBuffers) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]);
- if (partitionBufferBase_[partitionId] == 0) {
- return std::vector<std::shared_ptr<arrow::Buffer>>{};
- }
-
auto numRows = partitionBufferBase_[partitionId];
auto fixedWidthIdx = 0;
auto binaryIdx = 0;
@@ -1321,6 +1319,9 @@ arrow::Result<int64_t>
VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz
// Sort partition buffers by (partitionBufferSize_ - partitionBufferBase_)
std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
for (auto pid = 0; pid < numPartitions_; ++pid) {
+ if (partitionBufferInUse_.has_value() && *partitionBufferInUse_ == pid) {
+ continue;
+ }
if (partitionBufferSize_[pid] > 0 && partitionBufferSize_[pid] >
partitionBufferBase_[pid]) {
pidToSize.emplace_back(pid, partitionBufferSize_[pid] -
partitionBufferBase_[pid]);
}
@@ -1348,6 +1349,7 @@ arrow::Result<int64_t>
VeloxHashBasedShuffleWriter::shrinkPartitionBuffersMinSiz
arrow::Result<int64_t>
VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
// Evict partition buffers, only when splitState_ == SplitState::kInit, and
space freed from
// shrinking is not enough. In this case partitionBufferSize_ ==
partitionBufferBase_
+ VELOX_CHECK(!partitionBufferInUse_);
int64_t beforeEvict = partitionBufferPool_->bytes_allocated();
int64_t evicted = 0;
std::vector<std::pair<uint32_t, uint32_t>> pidToSize;
@@ -1375,10 +1377,12 @@ arrow::Result<int64_t>
VeloxHashBasedShuffleWriter::evictPartitionBuffersMinSize
bool VeloxHashBasedShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
// If OOM happens during SplitState::kSplit, it is triggered by binary
buffers resize.
// Or during SplitState::kInit, it is triggered by other operators.
+ // Or during SplitState::kStopEvict, it is triggered by assembleBuffers
allocating extra memory. In this case we use
+ // PartitionBufferGuard to prevent the target partition from being shrunk.
// The reclaim order is spill->shrink, because the partition buffers can be
reused.
// SinglePartitioning doesn't maintain partition buffers.
return options_.partitioning != Partitioning::kSingle &&
- (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit);
+ (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit
|| splitState_ == SplitState::kStopEvict);
}
bool VeloxHashBasedShuffleWriter::evictPartitionBuffersAfterSpill() const {
@@ -1391,7 +1395,7 @@ arrow::Result<uint32_t>
VeloxHashBasedShuffleWriter::partitionBufferSizeAfterShr
if (splitState_ == SplitState::kSplit) {
return partitionBufferBase_[partitionId] +
partition2RowCount_[partitionId];
}
- if (splitState_ == kInit || splitState_ == SplitState::kStop) {
+ if (splitState_ == kInit || splitState_ == SplitState::kStopEvict) {
return partitionBufferBase_[partitionId];
}
return arrow::Status::Invalid("Cannot shrink partition buffers in
SplitState: " + std::to_string(splitState_));
diff --git a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
index a11f84e952..f78211980a 100644
--- a/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashBasedShuffleWriter.h
@@ -87,7 +87,7 @@ namespace gluten {
#endif // end of VELOX_SHUFFLE_WRITER_PRINT
-enum SplitState { kInit, kPreAlloc, kSplit, kStop };
+enum SplitState { kInit, kPreAlloc, kSplit, kStopEvict, kStop };
struct BinaryArrayResizeState {
bool inResize;
@@ -303,6 +303,21 @@ class VeloxHashBasedShuffleWriter : public
VeloxShuffleWriter {
arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv,
int64_t memLimit);
+ class PartitionBufferGuard {
+ public:
+ PartitionBufferGuard(std::optional<uint32_t>& partitionInUse, uint32_t
partitionId)
+ : partitionBufferInUse_(partitionInUse) {
+ partitionBufferInUse_ = partitionId;
+ }
+
+ ~PartitionBufferGuard() {
+ partitionBufferInUse_ = std::nullopt;
+ }
+
+ private:
+ std::optional<uint32_t>& partitionBufferInUse_;
+ };
+
BinaryArrayResizeState binaryArrayResizeState_{};
bool hasComplexType_ = false;
@@ -401,6 +416,8 @@ class VeloxHashBasedShuffleWriter : public
VeloxShuffleWriter {
facebook::velox::serializer::presto::PrestoVectorSerde serde_;
SplitState splitState_{kInit};
+
+ std::optional<uint32_t> partitionBufferInUse_{std::nullopt};
}; // class VeloxHashBasedShuffleWriter
} // namespace gluten
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 1c1be6fc1b..957657eeb5 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -207,50 +207,13 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
- std::vector<VectorPtr> children = {
- makeNullableFlatVector<int32_t>({std::nullopt, 1}),
- makeRowVector({
- makeFlatVector<int32_t>({1, 3}),
- makeNullableFlatVector<velox::StringView>({std::nullopt, "de"}),
- }),
- makeNullableFlatVector<StringView>({std::nullopt, "10 I'm not inline
string"}),
- makeArrayVector<int64_t>({
- {1, 2, 3, 4, 5},
- {1, 2, 3},
- }),
- makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}},
{{3, "str3000"}, {4, "str4000"}}}),
- };
- auto dataVector = makeRowVector(children);
+ auto children = childrenComplex_;
children.insert((children.begin()), makeFlatVector<int32_t>({1, 2}));
auto vector = makeRowVector(children);
+ auto firstBlock = takeRows({inputVectorComplex_}, {{1}});
+ auto secondBlock = takeRows({inputVectorComplex_}, {{0}});
- auto firstBlock = makeRowVector({
- makeConstant<int32_t>(1, 1),
- makeRowVector({
- makeConstant<int32_t>(3, 1),
- makeFlatVector<velox::StringView>({"de"}),
- }),
- makeFlatVector<StringView>({"10 I'm not inline string"}),
- makeArrayVector<int64_t>({
- {1, 2, 3},
- }),
- makeMapVector<int32_t, StringView>({{{3, "str3000"}, {4, "str4000"}}}),
- });
-
- auto secondBlock = makeRowVector({
- makeNullConstant(TypeKind::INTEGER, 1),
- makeRowVector({
- makeConstant<int32_t>(1, 1),
- makeNullableFlatVector<velox::StringView>({std::nullopt}),
- }),
- makeNullableFlatVector<StringView>({std::nullopt}),
- makeArrayVector<int64_t>({
- {1, 2, 3, 4, 5},
- }),
- makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}}}),
- });
-
- testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, dataVector->type(),
{{firstBlock}, {secondBlock}});
+ testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2,
inputVectorComplex_->type(), {{firstBlock}, {secondBlock}});
}
TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
@@ -631,6 +594,27 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
}
}
+TEST_F(VeloxShuffleWriterMemoryTest, kStopComplex) {
+ ASSERT_NOT_OK(initShuffleWriterOptions());
+ shuffleWriterOptions_.bufferSize = 4;
+ auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
+ auto shuffleWriter = createShuffleWriter(&pool);
+
+ pool.setEvictable(shuffleWriter.get());
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
+ }
+
+ // Reclaim from PartitionWriter to free cached bytes.
+ auto payloadSize = shuffleWriter->cachedPayloadSize();
+ int64_t evicted;
+ ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
+ ASSERT_EQ(evicted, payloadSize);
+
+ // When evicting partitioning buffers in stop, spill will be triggered by
complex types allocating extra memory.
+ ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
ASSERT_NOT_OK(shuffleWriter->stop()); }));
+}
+
TEST_F(VeloxShuffleWriterMemoryTest, evictPartitionBuffers) {
ASSERT_NOT_OK(initShuffleWriterOptions());
shuffleWriterOptions_.bufferSize = 4;
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index fd3ae3d547..41b7f31e02 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -172,12 +172,27 @@ class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase
makeNullableFlatVector<facebook::velox::StringView>(
std::vector<std::optional<facebook::velox::StringView>>(2048,
std::nullopt)),
};
+ childrenComplex_ = {
+ makeNullableFlatVector<int32_t>({std::nullopt, 1}),
+ makeRowVector({
+ makeFlatVector<int32_t>({1, 3}),
+ makeNullableFlatVector<facebook::velox::StringView>({std::nullopt,
"de"}),
+ }),
+ makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "10
I'm not inline string"}),
+ makeArrayVector<int64_t>({
+ {1, 2, 3, 4, 5},
+ {1, 2, 3},
+ }),
+ makeMapVector<int32_t, facebook::velox::StringView>(
+ {{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4,
"str4000"}}}),
+ };
inputVector1_ = makeRowVector(children1_);
inputVector2_ = makeRowVector(children2_);
inputVectorNoNull_ = makeRowVector(childrenNoNull_);
inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
+ inputVectorComplex_ = makeRowVector(childrenComplex_);
}
arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter,
facebook::velox::RowVectorPtr vector) {
@@ -217,6 +232,7 @@ class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase
std::vector<facebook::velox::VectorPtr> childrenNoNull_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;
+ std::vector<facebook::velox::VectorPtr> childrenComplex_;
facebook::velox::RowVectorPtr inputVector1_;
facebook::velox::RowVectorPtr inputVector2_;
@@ -225,6 +241,7 @@ class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase
std::string largeString2_;
facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
+ facebook::velox::RowVectorPtr inputVectorComplex_;
};
class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]