This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 792283dadc [GLUTEN-10168][VL] Sort shuffle produce wrong partition 
lengths in case of spill (#10208)
792283dadc is described below

commit 792283dadc94d2004d503db4a07d1a2a07a9229a
Author: Rong Ma <[email protected]>
AuthorDate: Fri Jul 18 10:49:59 2025 +0100

    [GLUTEN-10168][VL] Sort shuffle produce wrong partition lengths in case of 
spill (#10208)
---
 cpp/core/shuffle/LocalPartitionWriter.cc   |  6 +++++-
 cpp/core/shuffle/rss/RssPartitionWriter.cc |  8 +++++---
 cpp/velox/tests/VeloxShuffleWriterTest.cc  | 23 +++++++++++++++++++++++
 3 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc 
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 457db60a5c..77734e3287 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -95,7 +95,11 @@ class LocalPartitionWriter::LocalSpiller {
   arrow::Status spill(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
payload) {
     ScopedTimer timer(&spillTime_);
 
-    curPid_ = partitionId;
+    if (curPid_ != partitionId) {
+      // Record the write position of the new partition.
+      ARROW_ASSIGN_OR_RAISE(writePos_, os_->Tell());
+      curPid_ = partitionId;
+    }
     flushed_ = false;
 
     auto* raw = compressedOs_ != nullptr ? compressedOs_.get() : os_.get();
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc 
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index d52bd5dbf3..996c137671 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -37,7 +37,8 @@ arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* 
metrics) {
       spillTime_ -= compressTime_;
     }
     RETURN_NOT_OK(rssOs_->Flush());
-    ARROW_ASSIGN_OR_RAISE(bytesEvicted_[lastEvictedPartitionId_], 
rssOs_->Tell());
+    ARROW_ASSIGN_OR_RAISE(const auto evicted, rssOs_->Tell());
+    bytesEvicted_[lastEvictedPartitionId_] += evicted;
     RETURN_NOT_OK(rssOs_->Close());
   }
 
@@ -77,7 +78,8 @@ RssPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayl
         RETURN_NOT_OK(compressedOs_->Flush());
       }
       RETURN_NOT_OK(rssOs_->Flush());
-      ARROW_ASSIGN_OR_RAISE(bytesEvicted_[lastEvictedPartitionId_], 
rssOs_->Tell());
+      ARROW_ASSIGN_OR_RAISE(const auto evicted, rssOs_->Tell());
+      bytesEvicted_[lastEvictedPartitionId_] += evicted;
       RETURN_NOT_OK(rssOs_->Close());
     }
 
@@ -94,7 +96,7 @@ RssPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayl
     lastEvictedPartitionId_ = partitionId;
   }
 
-  rawPartitionLengths_[partitionId] = inMemoryPayload->rawSize();
+  rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
   if (compressedOs_ != nullptr) {
     RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs_.get()));
   } else {
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 4d56e094b9..3f6cbc6b2a 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -743,6 +743,29 @@ TEST_P(RoundRobinPartitioningShuffleWriterTest, 
sortMaxRows) {
   shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2});
 }
 
+TEST_P(RoundRobinPartitioningShuffleWriterTest, sortSpill) {
+  if (GetParam().shuffleWriterType != ShuffleWriterType::kSortShuffle) {
+    return;
+  }
+  auto shuffleWriter = createShuffleWriter(2);
+
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
+
+  int64_t evicted;
+  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1024, &evicted));
+
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
+
+  auto blockPid1 =
+      takeRows({inputVector1_, inputVector1_, inputVector1_}, {{0, 2, 4, 6, 
8}, {0, 2, 4, 6, 8}, {0, 2, 4, 6, 8}});
+  auto blockPid2 =
+      takeRows({inputVector1_, inputVector1_, inputVector1_}, {{1, 3, 5, 7, 
9}, {1, 3, 5, 7, 9}, {1, 3, 5, 7, 9}});
+
+  // Stop and verify.
+  shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2});
+}
+
 INSTANTIATE_TEST_SUITE_P(
     SinglePartitioningShuffleWriterGroup,
     SinglePartitioningShuffleWriterTest,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to