This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7e97354e6c [GLUTEN-10192][VL] Fix sort shuffle read segfault in some
cases (#10193)
7e97354e6c is described below
commit 7e97354e6c7452e892bb0b1f2541386a181b2e8e
Author: Rong Ma <[email protected]>
AuthorDate: Wed Jul 23 09:09:29 2025 +0100
[GLUTEN-10192][VL] Fix sort shuffle read segfault in some cases (#10193)
---
cpp/velox/shuffle/VeloxShuffleReader.cc | 29 +++++++++++++++++-----------
cpp/velox/shuffle/VeloxShuffleReader.h | 2 ++
cpp/velox/tests/VeloxShuffleWriterTest.cc | 21 ++++++++++++--------
cpp/velox/tests/VeloxShuffleWriterTestBase.h | 15 +++++++++++---
4 files changed, 45 insertions(+), 22 deletions(-)
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 46fb1716fe..73a31c76fb 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -643,6 +643,9 @@ std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::next() {
}
if (lastRowSize_ != 0) {
+ if (lastRowSize_ > rowBuffer_->size()) {
+ reallocateRowBuffer();
+ }
readNextRow();
}
@@ -650,22 +653,18 @@ std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::next() {
GLUTEN_ASSIGN_OR_THROW(auto bytes, in_->Read(sizeof(RowSizeType),
&lastRowSize_));
if (bytes == 0) {
reachedEos_ = true;
- if (cachedRows_ > 0) {
+ if (bytesRead_ > 0) {
return deserializeToBatch();
}
return nullptr;
}
- if (cachedRows_ > 0 && lastRowSize_ + bytesRead_ > rowBuffer_->size()) {
- return deserializeToBatch();
- }
-
- if (lastRowSize_ > deserializerBufferSize_) {
- auto newSize = facebook::velox::bits::nextPowerOfTwo(lastRowSize_);
- LOG(WARNING) << "Row size " << lastRowSize_ << " exceeds deserializer
buffer size " << rowBuffer_->size()
- << ". Resizing buffer to " << newSize;
- rowBuffer_ = AlignedBuffer::allocate<char>(newSize, veloxPool_,
std::nullopt, true /*allocateExact*/);
- rowBufferPtr_ = rowBuffer_->asMutable<char>();
+ if (lastRowSize_ + bytesRead_ > rowBuffer_->size()) {
+ if (bytesRead_ > 0) {
+ // If we have already read some rows, return the current batch.
+ return deserializeToBatch();
+ }
+ reallocateRowBuffer();
}
readNextRow();
@@ -685,6 +684,14 @@ std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::deserializeTo
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
+void VeloxSortShuffleReaderDeserializer::reallocateRowBuffer() {
+ auto newSize = facebook::velox::bits::nextPowerOfTwo(lastRowSize_);
+ LOG(WARNING) << "Row size " << lastRowSize_ << " exceeds current buffer size
" << rowBuffer_->size()
+ << ". Resizing buffer to " << newSize;
+ rowBuffer_ = AlignedBuffer::allocate<char>(newSize, veloxPool_,
std::nullopt, true /*allocateExact*/);
+ rowBufferPtr_ = rowBuffer_->asMutable<char>();
+}
+
void VeloxSortShuffleReaderDeserializer::readNextRow() {
GLUTEN_THROW_NOT_OK(in_->Read(lastRowSize_, rowBufferPtr_ + bytesRead_));
data_.push_back(std::string_view(rowBufferPtr_ + bytesRead_, lastRowSize_));
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 4eae874765..686253da25 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -97,6 +97,8 @@ class VeloxSortShuffleReaderDeserializer final : public
ColumnarBatchIterator {
void readNextRow();
+ void reallocateRowBuffer();
+
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::RowTypePtr rowType_;
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 3f6cbc6b2a..419f18861e 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -43,6 +43,7 @@ struct ShuffleTestParams {
int32_t diskWriteBufferSize{0};
bool useRadixSort{false};
bool enableDictionary{false};
+ int64_t deserializerBufferSize{0};
std::string toString() const {
std::ostringstream out;
@@ -52,7 +53,8 @@ struct ShuffleTestParams {
<< ", compressionThreshold = " << compressionThreshold << ",
mergeBufferSize = " << mergeBufferSize
<< ", compressionBufferSize = " << diskWriteBufferSize
<< ", useRadixSort = " << (useRadixSort ? "true" : "false")
- << ", enableDictionary = " << (enableDictionary ? "true" : "false");
+ << ", enableDictionary = " << (enableDictionary ? "true" : "false")
+ << ", deserializerBufferSize = " << deserializerBufferSize;
return out.str();
}
};
@@ -106,12 +108,15 @@ std::vector<ShuffleTestParams> getTestParams() {
for (const auto partitionWriterType : {PartitionWriterType::kLocal,
PartitionWriterType::kRss}) {
for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) {
for (const bool useRadixSort : {true, false}) {
- params.push_back(ShuffleTestParams{
- .shuffleWriterType = ShuffleWriterType::kSortShuffle,
- .partitionWriterType = partitionWriterType,
- .compressionType = compression,
- .diskWriteBufferSize = diskWriteBufferSize,
- .useRadixSort = useRadixSort});
+ for (const int64_t deserializerBufferSize : {1L,
kDefaultDeserializerBufferSize}) {
+ params.push_back(ShuffleTestParams{
+ .shuffleWriterType = ShuffleWriterType::kSortShuffle,
+ .partitionWriterType = partitionWriterType,
+ .compressionType = compression,
+ .diskWriteBufferSize = diskWriteBufferSize,
+ .useRadixSort = useRadixSort,
+ .deserializerBufferSize = deserializerBufferSize});
+ }
}
}
}
@@ -297,7 +302,7 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
rowType,
kDefaultBatchSize,
kDefaultReadBufferSize,
- kDefaultDeserializerBufferSize,
+ GetParam().deserializerBufferSize,
getDefaultMemoryManager()->defaultArrowMemoryPool(),
pool_,
GetParam().shuffleWriterType);
diff --git a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
index 3f1fca58b0..ba3de8db95 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
@@ -92,9 +92,9 @@ class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase
std::nullopt}),
makeNullableFlatVector<float>(
{-0.1234567,
- std::nullopt,
0.1234567,
std::nullopt,
+ std::nullopt,
-0.142857,
std::nullopt,
0.142857,
@@ -104,9 +104,18 @@ class VeloxShuffleWriterTestBase : public
facebook::velox::test::VectorTestBase
makeNullableFlatVector<bool>(
{std::nullopt, true, false, std::nullopt, true, true, false, true,
std::nullopt, std::nullopt}),
makeFlatVector<facebook::velox::StringView>(
- {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6",
"boB7", "ALICE8", "BOB9"}),
+ {"a",
+ "bobbobbobooooooooooooooooooooooooooooob1",
+ "alice2",
+ "bob3",
+ "Alice4",
+ "Bob5",
+ "AlicE6",
+ "boB7",
+ "ALICE8",
+ "BOB9"}),
makeNullableFlatVector<facebook::velox::StringView>(
- {"alice_0",
+ {std::nullopt,
"bob_1",
std::nullopt,
std::nullopt,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]