This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e97354e6c [GLUTEN-10192][VL] Fix sort shuffle read segfault in some 
cases (#10193)
7e97354e6c is described below

commit 7e97354e6c7452e892bb0b1f2541386a181b2e8e
Author: Rong Ma <[email protected]>
AuthorDate: Wed Jul 23 09:09:29 2025 +0100

    [GLUTEN-10192][VL] Fix sort shuffle read segfault in some cases (#10193)
---
 cpp/velox/shuffle/VeloxShuffleReader.cc      | 29 +++++++++++++++++-----------
 cpp/velox/shuffle/VeloxShuffleReader.h       |  2 ++
 cpp/velox/tests/VeloxShuffleWriterTest.cc    | 21 ++++++++++++--------
 cpp/velox/tests/VeloxShuffleWriterTestBase.h | 15 +++++++++++---
 4 files changed, 45 insertions(+), 22 deletions(-)

diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 46fb1716fe..73a31c76fb 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -643,6 +643,9 @@ std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::next() {
   }
 
   if (lastRowSize_ != 0) {
+    if (lastRowSize_ > rowBuffer_->size()) {
+      reallocateRowBuffer();
+    }
     readNextRow();
   }
 
@@ -650,22 +653,18 @@ std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::next() {
     GLUTEN_ASSIGN_OR_THROW(auto bytes, in_->Read(sizeof(RowSizeType), 
&lastRowSize_));
     if (bytes == 0) {
       reachedEos_ = true;
-      if (cachedRows_ > 0) {
+      if (bytesRead_ > 0) {
         return deserializeToBatch();
       }
       return nullptr;
     }
 
-    if (cachedRows_ > 0 && lastRowSize_ + bytesRead_ > rowBuffer_->size()) {
-      return deserializeToBatch();
-    }
-
-    if (lastRowSize_ > deserializerBufferSize_) {
-      auto newSize = facebook::velox::bits::nextPowerOfTwo(lastRowSize_);
-      LOG(WARNING) << "Row size " << lastRowSize_ << " exceeds deserializer 
buffer size " << rowBuffer_->size()
-                   << ". Resizing buffer to " << newSize;
-      rowBuffer_ = AlignedBuffer::allocate<char>(newSize, veloxPool_, 
std::nullopt, true /*allocateExact*/);
-      rowBufferPtr_ = rowBuffer_->asMutable<char>();
+    if (lastRowSize_ + bytesRead_ > rowBuffer_->size()) {
+      if (bytesRead_ > 0) {
+        // If we have already read some rows, return the current batch.
+        return deserializeToBatch();
+      }
+      reallocateRowBuffer();
     }
 
     readNextRow();
@@ -685,6 +684,14 @@ std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::deserializeTo
   return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
 
+void VeloxSortShuffleReaderDeserializer::reallocateRowBuffer() {
+  auto newSize = facebook::velox::bits::nextPowerOfTwo(lastRowSize_);
+  LOG(WARNING) << "Row size " << lastRowSize_ << " exceeds current buffer size 
" << rowBuffer_->size()
+               << ". Resizing buffer to " << newSize;
+  rowBuffer_ = AlignedBuffer::allocate<char>(newSize, veloxPool_, 
std::nullopt, true /*allocateExact*/);
+  rowBufferPtr_ = rowBuffer_->asMutable<char>();
+}
+
 void VeloxSortShuffleReaderDeserializer::readNextRow() {
   GLUTEN_THROW_NOT_OK(in_->Read(lastRowSize_, rowBufferPtr_ + bytesRead_));
   data_.push_back(std::string_view(rowBufferPtr_ + bytesRead_, lastRowSize_));
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 4eae874765..686253da25 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -97,6 +97,8 @@ class VeloxSortShuffleReaderDeserializer final : public 
ColumnarBatchIterator {
 
   void readNextRow();
 
+  void reallocateRowBuffer();
+
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::util::Codec> codec_;
   facebook::velox::RowTypePtr rowType_;
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 3f6cbc6b2a..419f18861e 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -43,6 +43,7 @@ struct ShuffleTestParams {
   int32_t diskWriteBufferSize{0};
   bool useRadixSort{false};
   bool enableDictionary{false};
+  int64_t deserializerBufferSize{0};
 
   std::string toString() const {
     std::ostringstream out;
@@ -52,7 +53,8 @@ struct ShuffleTestParams {
         << ", compressionThreshold = " << compressionThreshold << ", 
mergeBufferSize = " << mergeBufferSize
         << ", compressionBufferSize = " << diskWriteBufferSize
         << ", useRadixSort = " << (useRadixSort ? "true" : "false")
-        << ", enableDictionary = " << (enableDictionary ? "true" : "false");
+        << ", enableDictionary = " << (enableDictionary ? "true" : "false")
+        << ", deserializerBufferSize = " << deserializerBufferSize;
     return out.str();
   }
 };
@@ -106,12 +108,15 @@ std::vector<ShuffleTestParams> getTestParams() {
     for (const auto partitionWriterType : {PartitionWriterType::kLocal, 
PartitionWriterType::kRss}) {
       for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) {
         for (const bool useRadixSort : {true, false}) {
-          params.push_back(ShuffleTestParams{
-              .shuffleWriterType = ShuffleWriterType::kSortShuffle,
-              .partitionWriterType = partitionWriterType,
-              .compressionType = compression,
-              .diskWriteBufferSize = diskWriteBufferSize,
-              .useRadixSort = useRadixSort});
+          for (const int64_t deserializerBufferSize : {1L, 
kDefaultDeserializerBufferSize}) {
+            params.push_back(ShuffleTestParams{
+                .shuffleWriterType = ShuffleWriterType::kSortShuffle,
+                .partitionWriterType = partitionWriterType,
+                .compressionType = compression,
+                .diskWriteBufferSize = diskWriteBufferSize,
+                .useRadixSort = useRadixSort,
+                .deserializerBufferSize = deserializerBufferSize});
+          }
         }
       }
     }
@@ -297,7 +302,7 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
         rowType,
         kDefaultBatchSize,
         kDefaultReadBufferSize,
-        kDefaultDeserializerBufferSize,
+        GetParam().deserializerBufferSize,
         getDefaultMemoryManager()->defaultArrowMemoryPool(),
         pool_,
         GetParam().shuffleWriterType);
diff --git a/cpp/velox/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
index 3f1fca58b0..ba3de8db95 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
@@ -92,9 +92,9 @@ class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase
              std::nullopt}),
         makeNullableFlatVector<float>(
             {-0.1234567,
-             std::nullopt,
              0.1234567,
              std::nullopt,
+             std::nullopt,
              -0.142857,
              std::nullopt,
              0.142857,
@@ -104,9 +104,18 @@ class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase
         makeNullableFlatVector<bool>(
             {std::nullopt, true, false, std::nullopt, true, true, false, true, 
std::nullopt, std::nullopt}),
         makeFlatVector<facebook::velox::StringView>(
-            {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6", 
"boB7", "ALICE8", "BOB9"}),
+            {"a",
+             "bobbobbobooooooooooooooooooooooooooooob1",
+             "alice2",
+             "bob3",
+             "Alice4",
+             "Bob5",
+             "AlicE6",
+             "boB7",
+             "ALICE8",
+             "BOB9"}),
         makeNullableFlatVector<facebook::velox::StringView>(
-            {"alice_0",
+            {std::nullopt,
              "bob_1",
              std::nullopt,
              std::nullopt,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to