This is an automated email from the ASF dual-hosted git repository.
kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f2a68705e1 [VL] Remove buffering of sorted partitions in RSS writer to
prevent OOM (#11059)
f2a68705e1 is described below
commit f2a68705e170e46950c66e18a077c8e2da0d2ae4
Author: Rex(Hui) An <[email protected]>
AuthorDate: Mon Nov 17 13:50:47 2025 +0800
[VL] Remove buffering of sorted partitions in RSS writer to prevent OOM
(#11059)
---
cpp/core/shuffle/rss/RssPartitionWriter.cc | 55 +++++++---------------
cpp/core/shuffle/rss/RssPartitionWriter.h | 6 ---
.../gluten/celeborn/CelebornShuffleManager.java | 3 ++
3 files changed, 20 insertions(+), 44 deletions(-)
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 3ded2fc0ba..243769a040 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -30,17 +30,7 @@ void RssPartitionWriter::init() {
}
arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
- if (rssOs_ != nullptr && !rssOs_->closed()) {
- if (compressedOs_ != nullptr) {
- RETURN_NOT_OK(compressedOs_->Close());
- compressTime_ = compressedOs_->compressTime();
- spillTime_ -= compressTime_;
- }
- ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
- bytesEvicted_[lastEvictedPartitionId_] +=
- rssClient_->pushPartitionData(lastEvictedPartitionId_,
buffer->data_as<char>(), buffer->size());
- }
-
+ spillTime_ -= compressTime_;
rssClient_->stop();
auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(),
bytesEvicted_.end(), 0LL);
@@ -70,36 +60,25 @@ arrow::Status RssPartitionWriter::hashEvict(
arrow::Status
RssPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
ScopedTimer timer(&spillTime_);
- if (lastEvictedPartitionId_ != partitionId) {
- if (lastEvictedPartitionId_ != -1) {
- GLUTEN_DCHECK(rssOs_ != nullptr && !rssOs_->closed(), "rssOs_ should not
be null");
- if (compressedOs_ != nullptr) {
- RETURN_NOT_OK(compressedOs_->Flush());
- }
-
- ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
- bytesEvicted_[lastEvictedPartitionId_] +=
- rssClient_->pushPartitionData(lastEvictedPartitionId_,
buffer->data_as<char>(), buffer->size());
- }
-
- ARROW_ASSIGN_OR_RAISE(
- rssOs_,
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize,
arrow::default_memory_pool()));
- if (codec_ != nullptr) {
- ARROW_ASSIGN_OR_RAISE(
- compressedOs_,
- ShuffleCompressedOutputStream::Make(
- codec_.get(), options_->compressionBufferSize, rssOs_,
arrow::default_memory_pool()));
- }
-
- lastEvictedPartitionId_ = partitionId;
- }
-
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
- if (compressedOs_ != nullptr) {
- RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs_.get()));
+ ARROW_ASSIGN_OR_RAISE(
+ auto rssOs,
arrow::io::BufferOutputStream::Create(options_->pushBufferMaxSize,
arrow::default_memory_pool()));
+ if (codec_ != nullptr) {
+ ARROW_ASSIGN_OR_RAISE(
+ auto compressedOs,
+ ShuffleCompressedOutputStream::Make(
+ codec_.get(), options_->compressionBufferSize, rssOs,
arrow::default_memory_pool()));
+ RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs.get()));
+ RETURN_NOT_OK(compressedOs->Flush());
+ RETURN_NOT_OK(compressedOs->Close());
+ compressTime_ += compressedOs->compressTime();
} else {
- RETURN_NOT_OK(inMemoryPayload->serialize(rssOs_.get()));
+ RETURN_NOT_OK(inMemoryPayload->serialize(rssOs.get()));
}
+ ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs->Finish());
+ bytesEvicted_[partitionId] +=
+ rssClient_->pushPartitionData(partitionId, buffer->data_as<char>(),
buffer->size());
+
return arrow::Status::OK();
}
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index 0b4d740984..6e6c5d7722 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -26,8 +26,6 @@
namespace gluten {
-class RssPartitionWriterOutputStream;
-
class RssPartitionWriter final : public PartitionWriter {
public:
RssPartitionWriter(
@@ -65,10 +63,6 @@ class RssPartitionWriter final : public PartitionWriter {
std::vector<int64_t> bytesEvicted_;
std::vector<int64_t> rawPartitionLengths_;
-
- int32_t lastEvictedPartitionId_{-1};
- std::shared_ptr<arrow::io::BufferOutputStream> rssOs_{nullptr};
- std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_{nullptr};
};
} // namespace gluten
diff --git
a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index 31d82ce0c7..e16ea92841 100644
---
a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -134,6 +134,9 @@ public class CelebornShuffleManager
rowBasedConf.set(SPARK_CELEBORN_COMPRESSION_CODEC_KEY,
celebornDefaultCodec);
rowBasedCelebornConf.set(CELEBORN_COMPRESSION_CODEC_KEY,
celebornDefaultCodec);
}
+
+ // Disable celeborn compression
+ celebornConf.set(CELEBORN_COMPRESSION_CODEC_KEY, "none");
}
private boolean isDriver() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]