This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f2a68705e1 [VL] Remove buffering of sorted partitions in RSS writer to 
prevent OOM (#11059)
f2a68705e1 is described below

commit f2a68705e170e46950c66e18a077c8e2da0d2ae4
Author: Rex(Hui) An <[email protected]>
AuthorDate: Mon Nov 17 13:50:47 2025 +0800

    [VL] Remove buffering of sorted partitions in RSS writer to prevent OOM 
(#11059)
---
 cpp/core/shuffle/rss/RssPartitionWriter.cc         | 55 +++++++---------------
 cpp/core/shuffle/rss/RssPartitionWriter.h          |  6 ---
 .../gluten/celeborn/CelebornShuffleManager.java    |  3 ++
 3 files changed, 20 insertions(+), 44 deletions(-)

diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc 
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 3ded2fc0ba..243769a040 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -30,17 +30,7 @@ void RssPartitionWriter::init() {
 }
 
 arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
-  if (rssOs_ != nullptr && !rssOs_->closed()) {
-    if (compressedOs_ != nullptr) {
-      RETURN_NOT_OK(compressedOs_->Close());
-      compressTime_ = compressedOs_->compressTime();
-      spillTime_ -= compressTime_;
-    }
-    ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
-    bytesEvicted_[lastEvictedPartitionId_] +=
-        rssClient_->pushPartitionData(lastEvictedPartitionId_, 
buffer->data_as<char>(), buffer->size());
-  }
-
+  spillTime_ -= compressTime_;
   rssClient_->stop();
 
   auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), 
bytesEvicted_.end(), 0LL);
@@ -70,36 +60,25 @@ arrow::Status RssPartitionWriter::hashEvict(
 arrow::Status
 RssPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
   ScopedTimer timer(&spillTime_);
-  if (lastEvictedPartitionId_ != partitionId) {
-    if (lastEvictedPartitionId_ != -1) {
-      GLUTEN_DCHECK(rssOs_ != nullptr && !rssOs_->closed(), "rssOs_ should not 
be null");
-      if (compressedOs_ != nullptr) {
-        RETURN_NOT_OK(compressedOs_->Flush());
-      }
-
-      ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
-      bytesEvicted_[lastEvictedPartitionId_] +=
-          rssClient_->pushPartitionData(lastEvictedPartitionId_, 
buffer->data_as<char>(), buffer->size());
-    }
-
-    ARROW_ASSIGN_OR_RAISE(
-        rssOs_, 
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize, 
arrow::default_memory_pool()));
-    if (codec_ != nullptr) {
-      ARROW_ASSIGN_OR_RAISE(
-          compressedOs_,
-          ShuffleCompressedOutputStream::Make(
-              codec_.get(), options_->compressionBufferSize, rssOs_, 
arrow::default_memory_pool()));
-    }
-
-    lastEvictedPartitionId_ = partitionId;
-  }
-
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
-  if (compressedOs_ != nullptr) {
-    RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs_.get()));
+  ARROW_ASSIGN_OR_RAISE(
+      auto rssOs, 
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize, 
arrow::default_memory_pool()));
+  if (codec_ != nullptr) {
+    ARROW_ASSIGN_OR_RAISE(
+        auto compressedOs,
+        ShuffleCompressedOutputStream::Make(
+            codec_.get(), options_->compressionBufferSize, rssOs, 
arrow::default_memory_pool()));
+    RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs.get()));
+    RETURN_NOT_OK(compressedOs->Flush());
+    RETURN_NOT_OK(compressedOs->Close());
+    compressTime_ += compressedOs->compressTime();
   } else {
-    RETURN_NOT_OK(inMemoryPayload->serialize(rssOs_.get()));
+    RETURN_NOT_OK(inMemoryPayload->serialize(rssOs.get()));
   }
+  ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs->Finish());
+  bytesEvicted_[partitionId] +=
+      rssClient_->pushPartitionData(partitionId, buffer->data_as<char>(), 
buffer->size());
+  
   return arrow::Status::OK();
 }
 
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h 
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index 0b4d740984..6e6c5d7722 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -26,8 +26,6 @@
 
 namespace gluten {
 
-class RssPartitionWriterOutputStream;
-
 class RssPartitionWriter final : public PartitionWriter {
  public:
   RssPartitionWriter(
@@ -65,10 +63,6 @@ class RssPartitionWriter final : public PartitionWriter {
 
   std::vector<int64_t> bytesEvicted_;
   std::vector<int64_t> rawPartitionLengths_;
-
-  int32_t lastEvictedPartitionId_{-1};
-  std::shared_ptr<arrow::io::BufferOutputStream> rssOs_{nullptr};
-  std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_{nullptr};
 };
 
 } // namespace gluten
diff --git 
a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
 
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index 31d82ce0c7..e16ea92841 100644
--- 
a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++ 
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -134,6 +134,9 @@ public class CelebornShuffleManager
       rowBasedConf.set(SPARK_CELEBORN_COMPRESSION_CODEC_KEY, 
celebornDefaultCodec);
       rowBasedCelebornConf.set(CELEBORN_COMPRESSION_CODEC_KEY, 
celebornDefaultCodec);
     }
+
+    // Disable celeborn compression
+    celebornConf.set(CELEBORN_COMPRESSION_CODEC_KEY, "none");
   }
 
   private boolean isDriver() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to