This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b8d42026f [GLUTEN-9163][VL] Use stream de/compressor in sort-based 
shuffle (#9278)
1b8d42026f is described below

commit 1b8d42026fbe7c4512af459ef75039cdd09d3b0b
Author: Rong Ma <[email protected]>
AuthorDate: Thu Apr 17 17:12:28 2025 +0100

    [GLUTEN-9163][VL] Use stream de/compressor in sort-based shuffle (#9278)
---
 cpp/core/jni/JniWrapper.cc                         |   8 +-
 cpp/core/shuffle/LocalPartitionWriter.cc           | 210 +++++++----
 cpp/core/shuffle/LocalPartitionWriter.h            |  14 +-
 cpp/core/shuffle/Options.h                         |   8 +-
 cpp/core/shuffle/PartitionWriter.h                 |  10 +-
 cpp/core/shuffle/Payload.cc                        |  24 +-
 cpp/core/shuffle/Payload.h                         |  20 +-
 cpp/core/shuffle/Spill.cc                          |   6 -
 cpp/core/shuffle/Spill.h                           |   7 -
 cpp/core/shuffle/Utils.h                           | 395 +++++++++++++++++++++
 cpp/core/shuffle/rss/RssPartitionWriter.cc         |  67 +++-
 cpp/core/shuffle/rss/RssPartitionWriter.h          |  97 ++++-
 cpp/velox/benchmarks/GenericBenchmark.cc           |   8 +
 cpp/velox/shuffle/VeloxHashShuffleWriter.cc        |  10 +-
 cpp/velox/shuffle/VeloxShuffleReader.cc            | 125 +++----
 cpp/velox/shuffle/VeloxShuffleReader.h             |  18 +-
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc        |  76 ++--
 cpp/velox/shuffle/VeloxSortShuffleWriter.h         |   8 +-
 cpp/velox/tests/VeloxShuffleWriterTest.cc          |  19 +-
 cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h |   8 +-
 20 files changed, 821 insertions(+), 317 deletions(-)

diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 512adf9e62..c761920c87 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -825,10 +825,10 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
     jstring codecJstr,
     jstring codecBackendJstr,
     jint compressionLevel,
-    jint sortEvictBufferSize,
+    jint diskWriteBufferSize,
     jint compressionThreshold,
     jstring compressionModeJstr,
-    jint sortBufferInitialSize,
+    jint initialSortBufferSize,
     jboolean useRadixSort,
     jstring dataFileJstr,
     jint numSubDirs,
@@ -856,8 +856,8 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
       .taskAttemptId = static_cast<int64_t>(taskAttemptId),
       .startPartitionId = startPartitionId,
       .shuffleWriterType = ShuffleWriter::stringToType(jStringToCString(env, 
shuffleWriterTypeJstr)),
-      .sortBufferInitialSize = sortBufferInitialSize,
-      .sortEvictBufferSize = sortEvictBufferSize,
+      .initialSortBufferSize = initialSortBufferSize,
+      .diskWriteBufferSize = diskWriteBufferSize,
       .useRadixSort = static_cast<bool>(useRadixSort)};
 
   // Build PartitionWriterOptions.
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc 
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 2cca7f4c69..d31cdde68a 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -15,33 +15,76 @@
  * limitations under the License.
  */
 
-#include <filesystem>
-#include <random>
-#include <thread>
-
-#include <boost/stacktrace.hpp>
-#include <glog/logging.h>
 #include "shuffle/LocalPartitionWriter.h"
+
 #include "shuffle/Payload.h"
 #include "shuffle/Spill.h"
 #include "shuffle/Utils.h"
+#include "utils/Timer.h"
 
-namespace gluten {
+#include <fcntl.h>
+#include <glog/logging.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <filesystem>
+#include <random>
+#include <thread>
 
+namespace gluten {
 class LocalPartitionWriter::LocalSpiller {
  public:
   LocalSpiller(
+      bool isFinal,
       std::shared_ptr<arrow::io::OutputStream> os,
       std::string spillFile,
       uint32_t compressionThreshold,
       arrow::MemoryPool* pool,
       arrow::util::Codec* codec)
-      : os_(os),
+      : isFinal_(isFinal),
+        os_(os),
         spillFile_(std::move(spillFile)),
         compressionThreshold_(compressionThreshold),
         pool_(pool),
         codec_(codec),
-        
diskSpill_(std::make_unique<Spill>(Spill::SpillType::kSequentialSpill)) {}
+        diskSpill_(std::make_unique<Spill>()) {
+    if (codec_ != nullptr) {
+      GLUTEN_ASSIGN_OR_THROW(
+          compressedOs_, ShuffleCompressedOutputStream::Make(codec_, os, 
arrow::default_memory_pool()));
+    }
+  }
+
+  arrow::Status flush() {
+    if (flushed_) {
+      return arrow::Status::OK();
+    }
+    flushed_ = true;
+
+    if (compressedOs_ != nullptr) {
+      RETURN_NOT_OK(compressedOs_->Flush());
+    }
+    ARROW_ASSIGN_OR_RAISE(const auto pos, os_->Tell());
+
+    diskSpill_->insertPayload(lastPid_, Payload::kRaw, 0, nullptr, pos - 
writePos_, pool_, nullptr);
+    DLOG(INFO) << "LocalSpiller: Spilled partition " << lastPid_ << " file 
start: " << writePos_
+               << ", file end: " << pos << ", file: " << spillFile_;
+    return arrow::Status::OK();
+  }
+
+  arrow::Status spill(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
payload) {
+    ScopedTimer timer(&spillTime_);
+
+    if (lastPid_ != partitionId) {
+      // Record the write position of the new partition.
+      ARROW_ASSIGN_OR_RAISE(writePos_, os_->Tell());
+      lastPid_ = partitionId;
+    }
+
+    flushed_ = false;
+    auto* raw = compressedOs_ != nullptr ? compressedOs_.get() : os_.get();
+    RETURN_NOT_OK(payload->serialize(raw));
+
+    return arrow::Status::OK();
+  }
 
   arrow::Status spill(uint32_t partitionId, std::unique_ptr<BlockPayload> 
payload) {
     // Check spill Type.
@@ -55,10 +98,6 @@ class LocalPartitionWriter::LocalSpiller {
     ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
     DLOG(INFO) << "LocalSpiller: Spilled partition " << partitionId << " file 
start: " << start << ", file end: " << end
                << ", file: " << spillFile_;
-    if (payload->type() == Payload::kRaw) {
-      diskSpill_->insertPayload(partitionId, Payload::kRaw, 0, nullptr, end - 
start, pool_, nullptr);
-      return arrow::Status::OK();
-    }
 
     auto payloadType = payload->type();
     if (payloadType == Payload::kUncompressed && codec_ != nullptr && 
payload->numRows() >= compressionThreshold_) {
@@ -69,17 +108,34 @@ class LocalPartitionWriter::LocalSpiller {
     return arrow::Status::OK();
   }
 
-  arrow::Result<std::shared_ptr<Spill>> finish(bool close) {
+  arrow::Result<std::shared_ptr<Spill>> finish() {
     ARROW_RETURN_IF(finished_, arrow::Status::Invalid("Calling finish() on a 
finished LocalSpiller."));
     ARROW_RETURN_IF(os_->closed(), arrow::Status::Invalid("Spill file os has 
been closed."));
 
-    finished_ = true;
-    if (close) {
+    if (lastPid_ != -1) {
+      if (compressedOs_ != nullptr) {
+        compressTime_ = compressedOs_->compressTime();
+        spillTime_ -= compressTime_;
+        RETURN_NOT_OK(compressedOs_->Close());
+      }
+
+      if (!isFinal_) {
+        ARROW_ASSIGN_OR_RAISE(auto pos, os_->Tell());
+        diskSpill_->insertPayload(lastPid_, Payload::kRaw, 0, nullptr, pos - 
writePos_, pool_, nullptr);
+        DLOG(INFO) << "LocalSpiller: Spilled partition " << lastPid_ << " file 
start: " << writePos_
+                   << ", file end: " << pos << ", file: " << spillFile_;
+      }
+    }
+
+    if (!isFinal_) {
       RETURN_NOT_OK(os_->Close());
     }
+
     diskSpill_->setSpillFile(spillFile_);
     diskSpill_->setSpillTime(spillTime_);
     diskSpill_->setCompressTime(compressTime_);
+    finished_ = true;
+
     return std::move(diskSpill_);
   }
 
@@ -88,28 +144,31 @@ class LocalPartitionWriter::LocalSpiller {
   }
 
  private:
+  bool isFinal_;
+
   std::shared_ptr<arrow::io::OutputStream> os_;
+  std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_{nullptr};
+  int64_t writePos_{0};
+
   std::string spillFile_;
   uint32_t compressionThreshold_;
   arrow::MemoryPool* pool_;
   arrow::util::Codec* codec_;
 
-  bool finished_{false};
   std::shared_ptr<Spill> diskSpill_{nullptr};
+
+  bool flushed_{true};
+  bool finished_{false};
   int64_t spillTime_{0};
   int64_t compressTime_{0};
+  int32_t lastPid_{-1};
 };
 
 class LocalPartitionWriter::PayloadMerger {
  public:
-  PayloadMerger(
-      const PartitionWriterOptions& options,
-      arrow::MemoryPool* pool,
-      arrow::util::Codec* codec,
-      bool hasComplexType)
+  PayloadMerger(const PartitionWriterOptions& options, arrow::MemoryPool* 
pool, arrow::util::Codec* codec)
       : pool_(pool),
         codec_(codec),
-        hasComplexType_(hasComplexType),
         compressionThreshold_(options.compressionThreshold),
         mergeBufferSize_(options.mergeBufferSize),
         mergeBufferMinSize_(options.mergeBufferSize * options.mergeThreshold) 
{}
@@ -117,7 +176,7 @@ class LocalPartitionWriter::PayloadMerger {
   arrow::Result<std::vector<std::unique_ptr<BlockPayload>>>
   merge(uint32_t partitionId, std::unique_ptr<InMemoryPayload> append, bool 
reuseBuffers) {
     std::vector<std::unique_ptr<BlockPayload>> merged{};
-    if (hasComplexType_) {
+    if (!append->mergeable()) {
       // TODO: Merging complex type is currently not supported.
       merged.emplace_back();
       ARROW_ASSIGN_OR_RAISE(merged.back(), 
createBlockPayload(std::move(append), reuseBuffers));
@@ -215,7 +274,6 @@ class LocalPartitionWriter::PayloadMerger {
  private:
   arrow::MemoryPool* pool_;
   arrow::util::Codec* codec_;
-  bool hasComplexType_;
   int32_t compressionThreshold_;
   int32_t mergeBufferSize_;
   int32_t mergeBufferMinSize_;
@@ -332,7 +390,7 @@ class LocalPartitionWriter::PayloadCache {
           spillTime_ += payload->getWriteTime();
 
           if (UNLIKELY(!diskSpill)) {
-            diskSpill = 
std::make_unique<Spill>(Spill::SpillType::kBatchedSpill);
+            diskSpill = std::make_unique<Spill>();
           }
           ARROW_ASSIGN_OR_RAISE(auto end, os->Tell());
           DLOG(INFO) << "PayloadCache: Spilled partition " << pid << " file 
start: " << start << ", file end: " << end
@@ -419,25 +477,34 @@ void LocalPartitionWriter::init() {
   subDirSelection_.assign(localDirs_.size(), 0);
 }
 
-arrow::Status LocalPartitionWriter::mergeSpills(uint32_t partitionId) {
-  auto spillId = 0;
-  auto spillIter = spills_.begin();
-  while (spillIter != spills_.end()) {
-    ARROW_ASSIGN_OR_RAISE(auto st, dataFileOs_->Tell());
-    (*spillIter)->openForRead(options_.shuffleFileBufferSize);
-    // Read if partition exists in the spilled file and write to the final 
file.
-    while (auto payload = (*spillIter)->nextPayload(partitionId)) {
+arrow::Result<int64_t> LocalPartitionWriter::mergeSpills(uint32_t partitionId) 
{
+  int64_t bytesEvicted = 0;
+  int32_t spillIndex = 0;
+
+  for (const auto& spill : spills_) {
+    ARROW_ASSIGN_OR_RAISE(auto startPos, dataFileOs_->Tell());
+
+    spill->openForRead(options_.shuffleFileBufferSize);
+
+    // Read if partition exists in the spilled file. Then write to the final 
data file.
+    while (auto payload = spill->nextPayload(partitionId)) {
       // May trigger spill during compression.
       RETURN_NOT_OK(payload->serialize(dataFileOs_.get()));
       compressTime_ += payload->getCompressTime();
       writeTime_ += payload->getWriteTime();
     }
-    ++spillIter;
-    ARROW_ASSIGN_OR_RAISE(auto ed, dataFileOs_->Tell());
-    DLOG(INFO) << "Partition " << partitionId << " spilled from spillResult " 
<< spillId++ << " of bytes " << ed - st;
-    totalBytesEvicted_ += (ed - st);
+
+    ARROW_ASSIGN_OR_RAISE(auto endPos, dataFileOs_->Tell());
+    auto bytesWritten = endPos - startPos;
+
+    DLOG(INFO) << "Partition " << partitionId << " spilled from spillResult " 
<< spillIndex++ << " of bytes "
+               << bytesWritten;
+
+    bytesEvicted += bytesWritten;
   }
-  return arrow::Status::OK();
+
+  totalBytesEvicted_ += bytesEvicted;
+  return bytesEvicted;
 }
 
 arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
@@ -447,17 +514,13 @@ arrow::Status 
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
   stopped_ = true;
 
   if (useSpillFileAsDataFile_) {
-    RETURN_NOT_OK(finishSpill(false));
-    // The last spill has been written to data file.
-    auto spill = std::move(spills_.back());
-    spills_.pop_back();
+    RETURN_NOT_OK(spiller_->flush());
+    ARROW_ASSIGN_OR_RAISE(auto spill, spiller_->finish());
 
     // Merge the remaining partitions from spills.
-    if (spills_.size() > 0) {
+    if (!spills_.empty()) {
       for (auto pid = lastEvictPid_ + 1; pid < numPartitions_; ++pid) {
-        auto bytesEvicted = totalBytesEvicted_;
-        RETURN_NOT_OK(mergeSpills(pid));
-        partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
+        ARROW_ASSIGN_OR_RAISE(partitionLengths_[pid], mergeSpills(pid));
       }
     }
 
@@ -469,7 +532,7 @@ arrow::Status 
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
     writeTime_ = spill->spillTime();
     compressTime_ += spill->compressTime();
   } else {
-    RETURN_NOT_OK(finishSpill(true));
+    RETURN_NOT_OK(finishSpill());
     ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
 
     int64_t endInFinalFile = 0;
@@ -527,6 +590,7 @@ arrow::Status LocalPartitionWriter::requestSpill(bool 
isFinal) {
     std::string spillFile;
     std::shared_ptr<arrow::io::OutputStream> os;
     if (isFinal) {
+      // If `spill()` is requested after `stop()`, open the final data file 
for writing.
       ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
       spillFile = dataFile_;
       os = dataFileOs_;
@@ -536,17 +600,16 @@ arrow::Status LocalPartitionWriter::requestSpill(bool 
isFinal) {
       ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile));
     }
     spiller_ = std::make_unique<LocalSpiller>(
-        os, std::move(spillFile), options_.compressionThreshold, 
payloadPool_.get(), codec_.get());
+        isFinal, os, std::move(spillFile), options_.compressionThreshold, 
payloadPool_.get(), codec_.get());
   }
   return arrow::Status::OK();
 }
 
-arrow::Status LocalPartitionWriter::finishSpill(bool close) {
-  // Finish the spiller. No compression, no spill.
+arrow::Status LocalPartitionWriter::finishSpill() {
   if (spiller_ && !spiller_->finished()) {
     auto spiller = std::move(spiller_);
     spills_.emplace_back();
-    ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish(close));
+    ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish());
   }
   return arrow::Status::OK();
 }
@@ -555,8 +618,7 @@ arrow::Status LocalPartitionWriter::hashEvict(
     uint32_t partitionId,
     std::unique_ptr<InMemoryPayload> inMemoryPayload,
     Evict::type evictType,
-    bool reuseBuffers,
-    bool hasComplexType) {
+    bool reuseBuffers) {
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
 
   if (evictType == Evict::kSpill) {
@@ -568,8 +630,7 @@ arrow::Status LocalPartitionWriter::hashEvict(
   }
 
   if (!merger_) {
-    merger_ =
-        std::make_shared<PayloadMerger>(options_, payloadPool_.get(), codec_ ? 
codec_.get() : nullptr, hasComplexType);
+    merger_ = std::make_shared<PayloadMerger>(options_, payloadPool_.get(), 
codec_ ? codec_.get() : nullptr);
   }
   ARROW_ASSIGN_OR_RAISE(auto merged, merger_->merge(partitionId, 
std::move(inMemoryPayload), reuseBuffers));
   if (!merged.empty()) {
@@ -584,43 +645,38 @@ arrow::Status LocalPartitionWriter::hashEvict(
   return arrow::Status::OK();
 }
 
-arrow::Status LocalPartitionWriter::sortEvict(
-    uint32_t partitionId,
-    std::unique_ptr<InMemoryPayload> inMemoryPayload,
-    std::shared_ptr<arrow::Buffer> compressed,
-    bool isFinal) {
+arrow::Status
+LocalPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
 
   if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && 
!dataFileOs_))) {
     lastEvictPid_ = -1;
-    RETURN_NOT_OK(finishSpill(true));
+    RETURN_NOT_OK(finishSpill());
   }
   RETURN_NOT_OK(requestSpill(isFinal));
 
-  auto payloadType = codec_ ? Payload::Type::kCompressed : 
Payload::Type::kUncompressed;
-  ARROW_ASSIGN_OR_RAISE(
-      auto payload,
-      inMemoryPayload->toBlockPayload(
-          payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, 
std::move(compressed)));
-  if (!isFinal) {
-    RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
-  } else {
-    if (spills_.size() > 0) {
+  if (lastEvictPid_ != partitionId) {
+    // Flush the remaining data for lastEvictPid_.
+    RETURN_NOT_OK(spiller_->flush());
+
+    // For final data file, merge all spills for partitions in [lastEvictPid_ 
+ 1, partitionId]. Note in this function,
+    // only the spilled partitions before partitionId are merged. Therefore, 
the remaining partitions after partitionId
+    // are not merged here and will be merged in `stop()`.
+    if (isFinal && !spills_.empty()) {
       for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
-        auto bytesEvicted = totalBytesEvicted_;
-        RETURN_NOT_OK(mergeSpills(pid));
-        partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
+        ARROW_ASSIGN_OR_RAISE(partitionLengths_[pid], mergeSpills(pid));
       }
     }
-    RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
   }
+
+  RETURN_NOT_OK(spiller_->spill(partitionId, std::move(inMemoryPayload)));
   lastEvictPid_ = partitionId;
   return arrow::Status::OK();
 }
 
 arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* 
actual) {
   // Finish last spiller.
-  RETURN_NOT_OK(finishSpill(true));
+  RETURN_NOT_OK(finishSpill());
 
   int64_t reclaimed = 0;
   // Reclaim memory from payloadCache.
@@ -651,7 +707,7 @@ arrow::Status 
LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
     // This is not accurate. When the evicted partition buffers are not 
copied, the merged ones
     // are resized from the original buffers thus allocated from 
partitionBufferPool.
     reclaimed += beforeSpill - payloadPool_->bytes_allocated();
-    RETURN_NOT_OK(finishSpill(true));
+    RETURN_NOT_OK(finishSpill());
   }
   *actual = reclaimed;
   return arrow::Status::OK();
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h 
b/cpp/core/shuffle/LocalPartitionWriter.h
index de1bce8e74..02cc16c565 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -38,14 +38,10 @@ class LocalPartitionWriter : public PartitionWriter {
       uint32_t partitionId,
       std::unique_ptr<InMemoryPayload> inMemoryPayload,
       Evict::type evictType,
-      bool reuseBuffers,
-      bool hasComplexType) override;
+      bool reuseBuffers) override;
 
-  arrow::Status sortEvict(
-      uint32_t partitionId,
-      std::unique_ptr<InMemoryPayload> inMemoryPayload,
-      std::shared_ptr<arrow::Buffer> compressed,
-      bool isFinal) override;
+  arrow::Status sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal)
+      override;
 
   // This code path is not used by LocalPartitionWriter, Not implement it by 
default.
   arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> 
blockPayload, bool stop) override {
@@ -90,13 +86,13 @@ class LocalPartitionWriter : public PartitionWriter {
 
   arrow::Status requestSpill(bool isFinal);
 
-  arrow::Status finishSpill(bool close);
+  arrow::Status finishSpill();
 
   std::string nextSpilledFileDir();
 
   arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const 
std::string& file);
 
-  arrow::Status mergeSpills(uint32_t partitionId);
+  arrow::Result<int64_t> mergeSpills(uint32_t partitionId);
 
   arrow::Status clearResource();
 
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 5e98f82912..07dd2999b0 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -30,7 +30,7 @@ static constexpr int64_t kDefaultSortBufferThreshold = 64 << 
20;
 static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
 static constexpr int32_t kDefaultNumSubDirs = 64;
 static constexpr int32_t kDefaultCompressionThreshold = 100;
-static constexpr int32_t kDefaultSortEvictBufferSize = 32 * 1024;
+static constexpr int32_t kDefaultDiskWriteBufferSize = 32 * 1024; // TODO: 
compare performance with 1M (spark default)
 static const std::string kDefaultCompressionTypeStr = "lz4";
 static constexpr int32_t kDefaultBufferAlignment = 64;
 static constexpr double kDefaultBufferReallocThreshold = 0.25;
@@ -65,9 +65,9 @@ struct ShuffleWriterOptions {
   ShuffleWriterType shuffleWriterType = kHashShuffle;
 
   // Sort shuffle writer.
-  int32_t sortBufferInitialSize = kDefaultSortBufferSize;
-  int32_t sortEvictBufferSize = kDefaultSortEvictBufferSize;
-  bool useRadixSort = kDefaultUseRadixSort;
+  int32_t initialSortBufferSize = kDefaultSortBufferSize; // 
spark.shuffle.sort.initialBufferSize
+  int32_t diskWriteBufferSize = kDefaultDiskWriteBufferSize; // 
spark.shuffle.spill.diskWriteBufferSize
+  bool useRadixSort = kDefaultUseRadixSort; // spark.shuffle.sort.useRadixSort
 };
 
 struct PartitionWriterOptions {
diff --git a/cpp/core/shuffle/PartitionWriter.h 
b/cpp/core/shuffle/PartitionWriter.h
index 3a44d38365..171efed0a3 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -46,14 +46,10 @@ class PartitionWriter : public Reclaimable {
       uint32_t partitionId,
       std::unique_ptr<InMemoryPayload> inMemoryPayload,
       Evict::type evictType,
-      bool reuseBuffers,
-      bool hasComplexType) = 0;
+      bool reuseBuffers) = 0;
 
-  virtual arrow::Status sortEvict(
-      uint32_t partitionId,
-      std::unique_ptr<InMemoryPayload> inMemoryPayload,
-      std::shared_ptr<arrow::Buffer> compressed,
-      bool isFinal) = 0;
+  virtual arrow::Status
+  sortEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
inMemoryPayload, bool isFinal) = 0;
 
   std::optional<int64_t> getCompressedBufferLength(const 
std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
     if (!codec_) {
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index ddf4a40966..602b49b39e 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -354,6 +354,7 @@ arrow::Result<std::unique_ptr<InMemoryPayload>> 
InMemoryPayload::merge(
     std::unique_ptr<InMemoryPayload> source,
     std::unique_ptr<InMemoryPayload> append,
     arrow::MemoryPool* pool) {
+  GLUTEN_DCHECK(source->mergeable() && append->mergeable(), "Cannot merge 
payloads.");
   auto mergedRows = source->numRows() + append->numRows();
   auto isValidityBuffer = source->isValidityBuffer();
 
@@ -435,10 +436,19 @@ arrow::Result<std::unique_ptr<BlockPayload>> 
InMemoryPayload::toBlockPayload(
 }
 
 arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* 
outputStream) {
-  return arrow::Status::Invalid("Cannot serialize InMemoryPayload.");
+  for (auto& buffer : buffers_) {
+    RETURN_NOT_OK(outputStream->Write(buffer->data(), buffer->size()));
+    buffer = nullptr;
+  }
+  buffers_.clear();
+  return arrow::Status::OK();
 }
 
 arrow::Result<std::shared_ptr<arrow::Buffer>> 
InMemoryPayload::readBufferAt(uint32_t index) {
+  GLUTEN_CHECK(
+      index < buffers_.size(),
+      "buffer index out of range: index = " + std::to_string(index) +
+          " vs buffer size = " + std::to_string(buffers_.size()));
   return std::move(buffers_[index]);
 }
 
@@ -462,6 +472,10 @@ int64_t InMemoryPayload::rawSize() {
   return getBufferSize(buffers_);
 }
 
+bool InMemoryPayload::mergeable() const {
+  return !hasComplexType_;
+}
+
 UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
     Type type,
     uint32_t numRows,
@@ -476,10 +490,6 @@ UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
       pool_(pool),
       codec_(codec) {}
 
-arrow::Result<std::shared_ptr<arrow::Buffer>> 
UncompressedDiskBlockPayload::readBufferAt(uint32_t index) {
-  return arrow::Status::Invalid("Cannot read buffer from 
UncompressedDiskBlockPayload.");
-}
-
 arrow::Status UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* 
outputStream) {
   ARROW_RETURN_IF(
       inputStream_ == nullptr, arrow::Status::Invalid("inputStream_ is 
uninitialized before calling serialize()."));
@@ -556,10 +566,6 @@ arrow::Status 
CompressedDiskBlockPayload::serialize(arrow::io::OutputStream* out
   return arrow::Status::OK();
 }
 
-arrow::Result<std::shared_ptr<arrow::Buffer>> 
CompressedDiskBlockPayload::readBufferAt(uint32_t index) {
-  return arrow::Status::Invalid("Cannot read buffer from 
CompressedDiskBlockPayload.");
-}
-
 int64_t CompressedDiskBlockPayload::rawSize() {
   return rawSize_;
 }
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index 611b2310d5..efc70b3fbe 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -36,8 +36,6 @@ class Payload {
 
   virtual arrow::Status serialize(arrow::io::OutputStream* outputStream) = 0;
 
-  virtual arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t 
index) = 0;
-
   virtual int64_t rawSize() = 0;
 
   int64_t getCompressTime() const {
@@ -101,7 +99,7 @@ class BlockPayload final : public Payload {
 
   arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
 
-  arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t pos) 
override;
+  arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t pos);
 
   int64_t rawSize() override;
 
@@ -127,15 +125,18 @@ class InMemoryPayload final : public Payload {
   InMemoryPayload(
       uint32_t numRows,
       const std::vector<bool>* isValidityBuffer,
-      std::vector<std::shared_ptr<arrow::Buffer>> buffers)
-      : Payload(Type::kUncompressed, numRows, isValidityBuffer), 
buffers_(std::move(buffers)) {}
+      std::vector<std::shared_ptr<arrow::Buffer>> buffers,
+      bool hasComplexType = false)
+      : Payload(Type::kUncompressed, numRows, isValidityBuffer),
+        buffers_(std::move(buffers)),
+        hasComplexType_(hasComplexType) {}
 
   static arrow::Result<std::unique_ptr<InMemoryPayload>>
   merge(std::unique_ptr<InMemoryPayload> source, 
std::unique_ptr<InMemoryPayload> append, arrow::MemoryPool* pool);
 
   arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
 
-  arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index) 
override;
+  arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index);
 
   arrow::Result<std::unique_ptr<BlockPayload>> toBlockPayload(
       Payload::Type payloadType,
@@ -147,8 +148,11 @@ class InMemoryPayload final : public Payload {
 
   int64_t rawSize() override;
 
+  bool mergeable() const;
+
  private:
   std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
+  bool hasComplexType_;
 };
 
 class UncompressedDiskBlockPayload final : public Payload {
@@ -162,8 +166,6 @@ class UncompressedDiskBlockPayload final : public Payload {
       arrow::MemoryPool* pool,
       arrow::util::Codec* codec);
 
-  arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index) 
override;
-
   arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
 
   int64_t rawSize() override;
@@ -189,8 +191,6 @@ class CompressedDiskBlockPayload final : public Payload {
 
   arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
 
-  arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index) 
override;
-
   int64_t rawSize() override;
 
  private:
diff --git a/cpp/core/shuffle/Spill.cc b/cpp/core/shuffle/Spill.cc
index 9b60d4bc3e..1a88a0b11f 100644
--- a/cpp/core/shuffle/Spill.cc
+++ b/cpp/core/shuffle/Spill.cc
@@ -21,8 +21,6 @@
 
 namespace gluten {
 
-Spill::Spill(Spill::SpillType type) : type_(type) {}
-
 Spill::~Spill() {
   if (is_) {
     static_cast<void>(is_->Close());
@@ -77,10 +75,6 @@ void Spill::openForRead(uint64_t shuffleFileBufferSize) {
   }
 }
 
-Spill::SpillType Spill::type() const {
-  return type_;
-}
-
 void Spill::setSpillFile(const std::string& spillFile) {
   spillFile_ = spillFile;
 }
diff --git a/cpp/core/shuffle/Spill.h b/cpp/core/shuffle/Spill.h
index fd692537c5..1bf55152ad 100644
--- a/cpp/core/shuffle/Spill.h
+++ b/cpp/core/shuffle/Spill.h
@@ -29,14 +29,8 @@ namespace gluten {
 
 class Spill final {
  public:
-  enum SpillType { kSequentialSpill, kBatchedSpill };
-
-  Spill(SpillType type);
-
   ~Spill();
 
-  SpillType type() const;
-
   void openForRead(uint64_t shuffleFileBufferSize);
 
   bool hasNextPayload(uint32_t partitionId);
@@ -70,7 +64,6 @@ class Spill final {
     std::unique_ptr<Payload> payload{};
   };
 
-  SpillType type_;
   std::shared_ptr<gluten::MmapFileStream> is_;
   std::list<PartitionPayload> partitionPayloads_{};
   std::string spillFile_;
diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h
index 2e5ff58b6e..cfe07e5f2c 100644
--- a/cpp/core/shuffle/Utils.h
+++ b/cpp/core/shuffle/Utils.h
@@ -27,6 +27,9 @@
 
 #include "utils/Compression.h"
 
+#include <utils/Exception.h>
+#include <utils/Timer.h>
+
 namespace gluten {
 
 using BinaryArrayLengthBufferType = uint32_t;
@@ -105,4 +108,396 @@ class MmapFileStream : public arrow::io::InputStream {
   int64_t posRetain_ = 0;
 };
 
+// Adopted from arrow::io::CompressedOutputStream. Rebuild compressor after 
each `Flush()`.
+class ShuffleCompressedOutputStream : public arrow::io::OutputStream {
+ public:
+  /// \brief Create a compressed output stream wrapping the given output 
stream.
+  static arrow::Result<std::shared_ptr<ShuffleCompressedOutputStream>>
+  Make(arrow::util::Codec* codec, const std::shared_ptr<OutputStream>& raw, 
arrow::MemoryPool* pool) {
+    auto res = std::shared_ptr<ShuffleCompressedOutputStream>(new 
ShuffleCompressedOutputStream(codec, raw, pool));
+    RETURN_NOT_OK(res->Init(codec));
+    return res;
+  }
+
+  arrow::Result<int64_t> Tell() const override {
+    return totalPos_;
+  }
+
+  arrow::Status Write(const void* data, int64_t nbytes) override {
+    ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+    if (nbytes == 0) {
+      return arrow::Status::OK();
+    }
+
+    freshCompressor_ = false;
+
+    int64_t flushTime = 0;
+    {
+      ScopedTimer timer(&compressTime_);
+      auto input = static_cast<const uint8_t*>(data);
+      while (nbytes > 0) {
+        int64_t input_len = nbytes;
+        int64_t output_len = compressed_->size() - compressedPos_;
+        uint8_t* output = compressed_->mutable_data() + compressedPos_;
+        ARROW_ASSIGN_OR_RAISE(auto result, compressor_->Compress(input_len, 
input, output_len, output));
+        compressedPos_ += result.bytes_written;
+
+        if (result.bytes_read == 0) {
+          // Not enough output, try to flush it and retry
+          if (compressedPos_ > 0) {
+            RETURN_NOT_OK(FlushCompressed(flushTime));
+            output_len = compressed_->size() - compressedPos_;
+            output = compressed_->mutable_data() + compressedPos_;
+            ARROW_ASSIGN_OR_RAISE(result, compressor_->Compress(input_len, 
input, output_len, output));
+            compressedPos_ += result.bytes_written;
+          }
+        }
+        input += result.bytes_read;
+        nbytes -= result.bytes_read;
+        totalPos_ += result.bytes_read;
+        if (compressedPos_ == compressed_->size()) {
+          // Output buffer full, flush it
+          RETURN_NOT_OK(FlushCompressed(flushTime));
+        }
+        if (result.bytes_read == 0) {
+          // Need to enlarge output buffer
+          RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+        }
+      }
+    }
+    compressTime_ -= flushTime;
+    flushTime_ += flushTime;
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Flush() override {
+    ARROW_RETURN_IF(!isOpen_, arrow::Status::Invalid("Stream is closed"));
+
+    if (freshCompressor_) {
+      // No data written, no need to flush
+      return arrow::Status::OK();
+    }
+
+    RETURN_NOT_OK(FinalizeCompression());
+    ARROW_ASSIGN_OR_RAISE(compressor_, codec_->MakeCompressor());
+    freshCompressor_ = true;
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Close() override {
+    if (isOpen_) {
+      isOpen_ = false;
+      if (!freshCompressor_) {
+        RETURN_NOT_OK(FinalizeCompression());
+      }
+      // Do not close the underlying stream, it is the caller's responsibility.
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Abort() override {
+    if (isOpen_) {
+      isOpen_ = false;
+      return raw_->Abort();
+    }
+    return arrow::Status::OK();
+  }
+
+  bool closed() const override {
+    return !isOpen_;
+  }
+
+  int64_t compressTime() const {
+    return compressTime_;
+  }
+
+  int64_t flushTime() const {
+    return flushTime_;
+  }
+
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(ShuffleCompressedOutputStream);
+
+  ShuffleCompressedOutputStream(
+      arrow::util::Codec* codec,
+      const std::shared_ptr<OutputStream>& raw,
+      arrow::MemoryPool* pool)
+      : codec_(codec), raw_(raw), pool_(pool) {}
+
+  arrow::Status Init(arrow::util::Codec* codec) {
+    ARROW_ASSIGN_OR_RAISE(compressor_, codec->MakeCompressor());
+    ARROW_ASSIGN_OR_RAISE(compressed_, AllocateResizableBuffer(kChunkSize, 
pool_));
+    compressedPos_ = 0;
+    isOpen_ = true;
+    return arrow::Status::OK();
+  }
+
+  arrow::Status FlushCompressed(int64_t& flushTime) {
+    if (compressedPos_ > 0) {
+      ScopedTimer timer(&flushTime);
+      RETURN_NOT_OK(raw_->Write(compressed_->data(), compressedPos_));
+      compressedPos_ = 0;
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Status FinalizeCompression() {
+    int64_t flushTime = 0;
+    {
+      ScopedTimer timer(&compressTime_);
+      while (true) {
+        // Try to end compressor
+        int64_t output_len = compressed_->size() - compressedPos_;
+        uint8_t* output = compressed_->mutable_data() + compressedPos_;
+        ARROW_ASSIGN_OR_RAISE(auto result, compressor_->End(output_len, 
output));
+        compressedPos_ += result.bytes_written;
+
+        // Flush compressed output
+        RETURN_NOT_OK(FlushCompressed(flushTime));
+
+        if (result.should_retry) {
+          // Need to enlarge output buffer
+          RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+        } else {
+          // Done
+          break;
+        }
+      }
+    }
+    compressTime_ -= flushTime;
+    flushTime_ += flushTime;
+    return arrow::Status::OK();
+  }
+
+  // TODO: Support setting chunk size
+  // Write 64 KB compressed data at a time
+  static const int64_t kChunkSize = 64 * 1024;
+
+  arrow::util::Codec* codec_;
+  std::shared_ptr<OutputStream> raw_;
+  arrow::MemoryPool* pool_;
+
+  bool freshCompressor_{true};
+  std::shared_ptr<arrow::util::Compressor> compressor_;
+  std::shared_ptr<arrow::ResizableBuffer> compressed_;
+
+  bool isOpen_{false};
+  int64_t compressedPos_{0};
+  // Total number of bytes compressed
+  int64_t totalPos_{0};
+
+  // Time spent on compressing data. Flushing the compressed data into raw_ 
stream is not included.
+  int64_t compressTime_{0};
+  int64_t flushTime_{0};
+};
+
+class CompressedInputStream : public arrow::io::InputStream {
+ public:
+  static arrow::Result<std::shared_ptr<CompressedInputStream>>
+  Make(arrow::util::Codec* codec, const std::shared_ptr<InputStream>& raw, 
arrow::MemoryPool* pool) {
+    std::shared_ptr<CompressedInputStream> res(new CompressedInputStream(raw, 
pool));
+    RETURN_NOT_OK(res->Init(codec));
+    return res;
+  }
+
+  arrow::Status Init(arrow::util::Codec* codec) {
+    ARROW_ASSIGN_OR_RAISE(decompressor_, codec->MakeDecompressor());
+    fresh_decompressor_ = true;
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Close() override {
+    if (is_open_) {
+      is_open_ = false;
+      return raw_->Close();
+    } else {
+      return arrow::Status::OK();
+    }
+  }
+
+  arrow::Status Abort() override {
+    if (is_open_) {
+      is_open_ = false;
+      return raw_->Abort();
+    } else {
+      return arrow::Status::OK();
+    }
+  }
+
+  bool closed() const override {
+    return !is_open_;
+  }
+
+  arrow::Result<int64_t> Tell() const override {
+    return total_pos_;
+  }
+
+  arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    ScopedTimer timer(&decompressWallTime_);
+    auto out_data = reinterpret_cast<uint8_t*>(out);
+
+    int64_t total_read = 0;
+    bool decompressor_has_data = true;
+
+    while (nbytes - total_read > 0 && decompressor_has_data) {
+      total_read += ReadFromDecompressed(nbytes - total_read, out_data + 
total_read);
+
+      if (nbytes == total_read) {
+        break;
+      }
+
+      // At this point, no more decompressed data remains, so we need to
+      // decompress more
+      RETURN_NOT_OK(RefillDecompressed(&decompressor_has_data));
+    }
+
+    total_pos_ += total_read;
+    return total_read;
+  }
+
+  arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
+    ARROW_ASSIGN_OR_RAISE(auto buf, arrow::AllocateResizableBuffer(nbytes, 
pool_));
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, 
buf->mutable_data()));
+    RETURN_NOT_OK(buf->Resize(bytes_read));
+    return std::move(buf);
+  }
+
+  int64_t decompressTime() const {
+    return decompressWallTime_ - blockingTime_;
+  }
+
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(CompressedInputStream);
+
+  CompressedInputStream() = default;
+
+  CompressedInputStream(const std::shared_ptr<InputStream>& raw, 
arrow::MemoryPool* pool)
+      : raw_(raw), pool_(pool), is_open_(true), compressed_pos_(0), 
decompressed_pos_(0), total_pos_(0) {}
+
+  // Read compressed data if necessary
+  arrow::Status EnsureCompressedData() {
+    int64_t compressed_avail = compressed_ ? compressed_->size() - 
compressed_pos_ : 0;
+    if (compressed_avail == 0) {
+      ScopedTimer timer(&blockingTime_);
+      // No compressed data available, read a full chunk
+      ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
+      compressed_pos_ = 0;
+    }
+    return arrow::Status::OK();
+  }
+
+  // Decompress some data from the compressed_ buffer.
+  // Call this function only if the decompressed_ buffer is empty.
+  arrow::Status DecompressData() {
+    GLUTEN_CHECK(compressed_->data() != nullptr, "Compressed data is null");
+
+    int64_t decompress_size = kDecompressSize;
+
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(decompressed_, 
AllocateResizableBuffer(decompress_size, pool_));
+      decompressed_pos_ = 0;
+
+      int64_t input_len = compressed_->size() - compressed_pos_;
+      const uint8_t* input = compressed_->data() + compressed_pos_;
+      int64_t output_len = decompressed_->size();
+      uint8_t* output = decompressed_->mutable_data();
+
+      ARROW_ASSIGN_OR_RAISE(auto result, decompressor_->Decompress(input_len, 
input, output_len, output));
+      compressed_pos_ += result.bytes_read;
+      if (result.bytes_read > 0) {
+        fresh_decompressor_ = false;
+      }
+      if (result.bytes_written > 0 || !result.need_more_output || input_len == 
0) {
+        // StdMemoryAllocator does not allow resize to 0.
+        if (result.bytes_written > 0) {
+          RETURN_NOT_OK(decompressed_->Resize(result.bytes_written));
+        } else {
+          decompressed_.reset();
+        }
+        break;
+      }
+      GLUTEN_CHECK(result.bytes_written == 0, "Decompressor should return 0 
bytes written");
+      // Need to enlarge output buffer
+      decompress_size *= 2;
+    }
+    return arrow::Status::OK();
+  }
+
+  // Read a given number of bytes from the decompressed_ buffer.
+  int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) {
+    int64_t readable = decompressed_ ? (decompressed_->size() - 
decompressed_pos_) : 0;
+    int64_t read_bytes = std::min(readable, nbytes);
+
+    if (read_bytes > 0) {
+      memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes);
+      decompressed_pos_ += read_bytes;
+
+      if (decompressed_pos_ == decompressed_->size()) {
+        // Decompressed data is exhausted, release buffer
+        decompressed_.reset();
+      }
+    }
+
+    return read_bytes;
+  }
+
+  // Try to feed more data into the decompressed_ buffer.
+  arrow::Status RefillDecompressed(bool* has_data) {
+    // First try to read data from the decompressor
+    if (compressed_ && compressed_->size() != 0) {
+      if (decompressor_->IsFinished()) {
+        // We just went over the end of a previous compressed stream.
+        RETURN_NOT_OK(decompressor_->Reset());
+        fresh_decompressor_ = true;
+      }
+      RETURN_NOT_OK(DecompressData());
+    }
+    if (!decompressed_ || decompressed_->size() == 0) {
+      // Got nothing, need to read more compressed data
+      RETURN_NOT_OK(EnsureCompressedData());
+      if (compressed_pos_ == compressed_->size()) {
+        // No more data to decompress
+        if (!fresh_decompressor_ && !decompressor_->IsFinished()) {
+          return arrow::Status::IOError("Truncated compressed stream");
+        }
+        *has_data = false;
+        return arrow::Status::OK();
+      }
+      RETURN_NOT_OK(DecompressData());
+    }
+    *has_data = true;
+    return arrow::Status::OK();
+  }
+
+  std::shared_ptr<InputStream> raw() const {
+    return raw_;
+  }
+
+  // Read 64 KB compressed data at a time
+  static const int64_t kChunkSize = 64 * 1024;
+  // Decompress 1 MB at a time
+  static const int64_t kDecompressSize = 1024 * 1024;
+
+  std::shared_ptr<InputStream> raw_;
+  arrow::MemoryPool* pool_;
+
+  std::shared_ptr<arrow::util::Decompressor> decompressor_;
+  std::shared_ptr<arrow::Buffer> compressed_;
+
+  bool is_open_;
+  // Position in compressed buffer
+  int64_t compressed_pos_;
+  std::shared_ptr<arrow::ResizableBuffer> decompressed_;
+  // Position in decompressed buffer
+  int64_t decompressed_pos_;
+  // True if the decompressor hasn't read any data yet.
+  bool fresh_decompressor_;
+  // Total number of bytes decompressed
+  int64_t total_pos_;
+
+  int64_t blockingTime_{0};
+  int64_t decompressWallTime_{0};
+};
+
 } // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc 
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 4bb426d864..9695ce35c1 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -30,13 +30,23 @@ void RssPartitionWriter::init() {
 }
 
 arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
-  // Push data and collect metrics.
-  auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), 
bytesEvicted_.end(), 0LL);
+  if (rssOs_ != nullptr && !rssOs_->closed()) {
+    if (compressedOs_ != nullptr) {
+      RETURN_NOT_OK(compressedOs_->Close());
+      compressTime_ = compressedOs_->compressTime();
+      spillTime_ -= compressTime_;
+    }
+    RETURN_NOT_OK(rssOs_->Flush());
+    ARROW_ASSIGN_OR_RAISE(bytesEvicted_[lastEvictedPartitionId_], 
rssOs_->Tell());
+    RETURN_NOT_OK(rssOs_->Close());
+  }
+
   rssClient_->stop();
+
+  auto totalBytesEvicted = std::accumulate(bytesEvicted_.begin(), 
bytesEvicted_.end(), 0LL);
   // Populate metrics.
   metrics->totalCompressTime += compressTime_;
   metrics->totalEvictTime += spillTime_;
-  metrics->totalWriteTime += writeTime_;
   metrics->totalBytesEvicted += totalBytesEvicted;
   metrics->totalBytesWritten += totalBytesEvicted;
   metrics->partitionLengths = std::move(bytesEvicted_);
@@ -53,17 +63,42 @@ arrow::Status RssPartitionWriter::hashEvict(
     uint32_t partitionId,
     std::unique_ptr<InMemoryPayload> inMemoryPayload,
     Evict::type evictType,
-    bool reuseBuffers,
-    bool hasComplexType) {
-  return doEvict(partitionId, std::move(inMemoryPayload), nullptr);
+    bool reuseBuffers) {
+  return doEvict(partitionId, std::move(inMemoryPayload));
 }
 
-arrow::Status RssPartitionWriter::sortEvict(
-    uint32_t partitionId,
-    std::unique_ptr<InMemoryPayload> inMemoryPayload,
-    std::shared_ptr<arrow::Buffer> compressed,
-    bool isFinal) {
-  return doEvict(partitionId, std::move(inMemoryPayload), 
std::move(compressed));
+arrow::Status
+RssPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
+  ScopedTimer timer(&spillTime_);
+  if (lastEvictedPartitionId_ != partitionId) {
+    if (lastEvictedPartitionId_ != -1) {
+      GLUTEN_DCHECK(rssOs_ != nullptr && !rssOs_->closed(), 
"RssPartitionWriterOutputStream should not be null");
+      if (compressedOs_ != nullptr) {
+        RETURN_NOT_OK(compressedOs_->Flush());
+      }
+      RETURN_NOT_OK(rssOs_->Flush());
+      ARROW_ASSIGN_OR_RAISE(bytesEvicted_[lastEvictedPartitionId_], 
rssOs_->Tell());
+      RETURN_NOT_OK(rssOs_->Close());
+    }
+
+    rssOs_ =
+        std::make_shared<RssPartitionWriterOutputStream>(partitionId, 
rssClient_.get(), options_.pushBufferMaxSize);
+    RETURN_NOT_OK(rssOs_->init());
+    if (codec_ != nullptr) {
+      ARROW_ASSIGN_OR_RAISE(
+          compressedOs_, ShuffleCompressedOutputStream::Make(codec_.get(), 
rssOs_, arrow::default_memory_pool()));
+    }
+
+    lastEvictedPartitionId_ = partitionId;
+  }
+
+  rawPartitionLengths_[partitionId] = inMemoryPayload->rawSize();
+  if (compressedOs_ != nullptr) {
+    RETURN_NOT_OK(inMemoryPayload->serialize(compressedOs_.get()));
+  } else {
+    RETURN_NOT_OK(inMemoryPayload->serialize(rssOs_.get()));
+  }
+  return arrow::Status::OK();
 }
 
 arrow::Status RssPartitionWriter::evict(uint32_t partitionId, 
std::unique_ptr<BlockPayload> blockPayload, bool) {
@@ -74,16 +109,12 @@ arrow::Status RssPartitionWriter::evict(uint32_t 
partitionId, std::unique_ptr<Bl
   return arrow::Status::OK();
 }
 
-arrow::Status RssPartitionWriter::doEvict(
-    uint32_t partitionId,
-    std::unique_ptr<InMemoryPayload> inMemoryPayload,
-    std::shared_ptr<arrow::Buffer> compressed) {
+arrow::Status RssPartitionWriter::doEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload) {
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
   auto payloadType = codec_ ? Payload::Type::kCompressed : 
Payload::Type::kUncompressed;
   ARROW_ASSIGN_OR_RAISE(
       auto payload,
-      inMemoryPayload->toBlockPayload(
-          payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, 
std::move(compressed)));
+      inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ 
? codec_.get() : nullptr, nullptr));
   // Copy payload to arrow buffered os.
   ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, 
arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize));
   RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h 
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index 2034c321e1..040c7546b9 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -26,6 +26,8 @@
 
 namespace gluten {
 
+class RssPartitionWriterOutputStream;
+
 class RssPartitionWriter final : public PartitionWriter {
  public:
   RssPartitionWriter(
@@ -41,14 +43,10 @@ class RssPartitionWriter final : public PartitionWriter {
       uint32_t partitionId,
       std::unique_ptr<InMemoryPayload> inMemoryPayload,
       Evict::type evictType,
-      bool reuseBuffers,
-      bool hasComplexType) override;
+      bool reuseBuffers) override;
 
-  arrow::Status sortEvict(
-      uint32_t partitionId,
-      std::unique_ptr<InMemoryPayload> inMemoryPayload,
-      std::shared_ptr<arrow::Buffer> compressed,
-      bool isFinal) override;
+  arrow::Status sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal)
+      override;
 
   arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> 
blockPayload, bool stop) override;
 
@@ -59,14 +57,91 @@ class RssPartitionWriter final : public PartitionWriter {
  private:
   void init();
 
-  arrow::Status doEvict(
-      uint32_t partitionId,
-      std::unique_ptr<InMemoryPayload> inMemoryPayload,
-      std::shared_ptr<arrow::Buffer> compressed);
+  arrow::Status doEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
inMemoryPayload);
 
   std::shared_ptr<RssClient> rssClient_;
 
   std::vector<int64_t> bytesEvicted_;
   std::vector<int64_t> rawPartitionLengths_;
+
+  int32_t lastEvictedPartitionId_{-1};
+  std::shared_ptr<RssPartitionWriterOutputStream> rssOs_;
+  std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_;
+};
+
+class RssPartitionWriterOutputStream final : public arrow::io::OutputStream {
+ public:
+  RssPartitionWriterOutputStream(int32_t partitionId, RssClient* rssClient, 
int64_t pushBufferSize)
+      : partitionId_(partitionId), rssClient_(rssClient), 
bufferSize_(pushBufferSize) {}
+
+  arrow::Status init() {
+    ARROW_ASSIGN_OR_RAISE(pushBuffer_, arrow::AllocateBuffer(bufferSize_, 
arrow::default_memory_pool()));
+    pushBufferPtr_ = pushBuffer_->mutable_data();
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Close() override {
+    RETURN_NOT_OK(Flush());
+    pushBuffer_.reset();
+    return arrow::Status::OK();
+  }
+
+  bool closed() const override {
+    return pushBuffer_ == nullptr;
+  }
+
+  arrow::Result<int64_t> Tell() const override {
+    return bytesEvicted_ + bufferPos_;
+  }
+
+  arrow::Status Write(const void* data, int64_t nbytes) override {
+    auto dataPtr = static_cast<const char*>(data);
+    if (nbytes < 0) {
+      return arrow::Status::Invalid("write count should be >= 0");
+    }
+    if (nbytes == 0) {
+      return arrow::Status::OK();
+    }
+
+    if (nbytes + bufferPos_ <= bufferSize_) {
+      std::memcpy(pushBufferPtr_ + bufferPos_, dataPtr, nbytes);
+      bufferPos_ += nbytes;
+      return arrow::Status::OK();
+    }
+
+    int64_t bytesWritten = 0;
+    while (bytesWritten < nbytes) {
+      auto remaining = nbytes - bytesWritten;
+      if (remaining <= bufferSize_ - bufferPos_) {
+        std::memcpy(pushBufferPtr_ + bufferPos_, dataPtr + bytesWritten, 
remaining);
+        bufferPos_ += remaining;
+        return arrow::Status::OK();
+      }
+      auto toWrite = bufferSize_ - bufferPos_;
+      std::memcpy(pushBufferPtr_ + bufferPos_, dataPtr + bytesWritten, 
toWrite);
+      bytesWritten += toWrite;
+      bufferPos_ += toWrite;
+      RETURN_NOT_OK(Flush());
+    }
+    return arrow::Status::OK();
+  }
+
+  arrow::Status Flush() override {
+    if (bufferPos_ > 0) {
+      bytesEvicted_ += rssClient_->pushPartitionData(partitionId_, 
reinterpret_cast<char*>(pushBufferPtr_), bufferPos_);
+      bufferPos_ = 0;
+    }
+    return arrow::Status::OK();
+  }
+
+ private:
+  int32_t partitionId_;
+  RssClient* rssClient_;
+  int64_t bufferSize_{kDefaultPushMemoryThreshold};
+
+  std::shared_ptr<arrow::Buffer> pushBuffer_;
+  uint8_t* pushBufferPtr_{nullptr};
+  int64_t bufferPos_{0};
+  int64_t bytesEvicted_{0};
 };
 } // namespace gluten
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 244e0b885a..1a48ca815d 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -96,6 +96,7 @@ struct WriterMetrics {
   int64_t writeTime{0};
   int64_t compressTime{0};
 
+  int64_t dataSize{0};
   int64_t bytesSpilled{0};
   int64_t bytesWritten{0};
 };
@@ -238,6 +239,8 @@ void populateWriterMetrics(
   if (splitTime > 0) {
     metrics.splitTime += splitTime;
   }
+  metrics.dataSize +=
+      std::accumulate(shuffleWriter->rawPartitionLengths().begin(), 
shuffleWriter->rawPartitionLengths().end(), 0LL);
   metrics.bytesWritten += shuffleWriter->totalBytesWritten();
   metrics.bytesSpilled += shuffleWriter->totalBytesEvicted();
 }
@@ -302,6 +305,9 @@ void runShuffle(
       // Read and discard.
       auto cb = iter->next();
     }
+    // Call the dtor to collect the metrics.
+    iter.reset();
+
     readerMetrics.decompressTime = reader->getDecompressTime();
     readerMetrics.deserializeTime = reader->getDeserializeTime();
   }
@@ -343,6 +349,8 @@ void updateBenchmarkMetrics(
     state.counters["shuffle_split_time"] =
         benchmark::Counter(splitTime, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1000);
 
+    state.counters["shuffle_data_size"] = benchmark::Counter(
+        writerMetrics.dataSize, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1024);
     state.counters["shuffle_spilled_bytes"] = benchmark::Counter(
         writerMetrics.bytesSpilled, benchmark::Counter::kAvgIterations, 
benchmark::Counter::OneK::kIs1024);
     state.counters["shuffle_write_bytes"] = benchmark::Counter(
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index ee765f0219..ae2127a135 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -962,9 +962,8 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers(
     std::vector<std::shared_ptr<arrow::Buffer>> buffers,
     bool reuseBuffers) {
   if (!buffers.empty()) {
-    auto payload = std::make_unique<InMemoryPayload>(numRows, 
&isValidityBuffer_, std::move(buffers));
-    RETURN_NOT_OK(
-        partitionWriter_->hashEvict(partitionId, std::move(payload), 
Evict::kCache, reuseBuffers, hasComplexType_));
+    auto payload = std::make_unique<InMemoryPayload>(numRows, 
&isValidityBuffer_, std::move(buffers), hasComplexType_);
+    RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload), 
Evict::kCache, reuseBuffers));
   }
   return arrow::Status::OK();
 }
@@ -1373,9 +1372,10 @@ arrow::Result<int64_t> 
VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
     for (auto& item : pidToSize) {
       auto pid = item.first;
       ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));
-      auto payload = std::make_unique<InMemoryPayload>(item.second, 
&isValidityBuffer_, std::move(buffers));
+      auto payload =
+          std::make_unique<InMemoryPayload>(item.second, &isValidityBuffer_, 
std::move(buffers), hasComplexType_);
       metrics_.totalBytesToEvict += payload->rawSize();
-      RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), 
Evict::kSpill, false, hasComplexType_));
+      RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), 
Evict::kSpill, false));
       evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
       if (evicted >= size) {
         break;
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 5a379b8656..a5c727f602 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -19,6 +19,8 @@
 
 #include <arrow/array/array_binary.h>
 #include <arrow/io/buffered.h>
+#include <arrow/io/compressed.h>
+#include <velox/common/caching/AsyncDataCache.h>
 
 #include "memory/VeloxColumnarBatch.h"
 #include "shuffle/GlutenByteStream.h"
@@ -41,14 +43,16 @@
 using namespace facebook::velox;
 
 namespace gluten {
-
 namespace {
+constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize;
 
 struct BufferViewReleaser {
   BufferViewReleaser() : BufferViewReleaser(nullptr) {}
+
   BufferViewReleaser(std::shared_ptr<arrow::Buffer> arrowBuffer) : 
bufferReleaser_(std::move(arrowBuffer)) {}
 
   void addRef() const {}
+
   void release() const {}
 
  private:
@@ -281,7 +285,6 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
   auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, pool);
   return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
-
 } // namespace
 
 VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
@@ -384,32 +387,40 @@ 
VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer(
       codec_(codec),
       rowType_(rowType),
       batchSize_(batchSize),
-      arrowPool_(memoryPool),
       veloxPool_(veloxPool),
       deserializeTime_(deserializeTime),
       decompressTime_(decompressTime) {
-  GLUTEN_ASSIGN_OR_THROW(in_, 
arrow::io::BufferedInputStream::Create(bufferSize, memoryPool, std::move(in)));
+  if (codec_ != nullptr) {
+    GLUTEN_ASSIGN_OR_THROW(in_, CompressedInputStream::Make(codec_.get(), 
std::move(in), memoryPool));
+  } else {
+    GLUTEN_ASSIGN_OR_THROW(in_, 
arrow::io::BufferedInputStream::Create(bufferSize, memoryPool, std::move(in)));
+  }
+}
+
+VeloxSortShuffleReaderDeserializer::~VeloxSortShuffleReaderDeserializer() {
+  if (auto in = std::dynamic_pointer_cast<CompressedInputStream>(in_)) {
+    decompressTime_ += in->decompressTime();
+  }
 }
 
 std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::next() {
   if (reachedEos_) {
-    if (cachedRows_ > 0) {
-      return deserializeToBatch();
-    }
     return nullptr;
   }
 
-  if (cachedRows_ >= batchSize_) {
-    return deserializeToBatch();
+  if (rowBuffer_ == nullptr) {
+    rowBuffer_ = AlignedBuffer::allocate<char>(kMaxReadBufferSize, veloxPool_);
+    rowBufferPtr_ = rowBuffer_->asMutable<char>();
+    data_.reserve(batchSize_);
   }
 
-  while (cachedRows_ < batchSize_) {
-    uint32_t numRows = 0;
-    GLUTEN_ASSIGN_OR_THROW(
-        auto arrowBuffers,
-        BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, 
deserializeTime_, decompressTime_));
+  if (lastRowSize_ != 0) {
+    readNextRow();
+  }
 
-    if (arrowBuffers.empty()) {
+  while (cachedRows_ < batchSize_) {
+    GLUTEN_ASSIGN_OR_THROW(auto bytes, in_->Read(sizeof(RowSizeType), 
&lastRowSize_));
+    if (bytes == 0) {
       reachedEos_ = true;
       if (cachedRows_ > 0) {
         return deserializeToBatch();
@@ -417,80 +428,35 @@ std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::next() {
       return nullptr;
     }
 
-    if (numRows > 0) {
-      auto buffer = std::move(arrowBuffers[0]);
-      cachedInputs_.emplace_back(numRows, 
wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
-      cachedRows_ += numRows;
-    } else {
-      // numRows = 0 indicates that we read a segment of a large row.
-      readLargeRow(arrowBuffers);
+    GLUTEN_CHECK(
+        lastRowSize_ <= kMaxReadBufferSize, "Row size exceeds max read buffer 
size: " + std::to_string(lastRowSize_));
+
+    if (lastRowSize_ + bytesRead_ > kMaxReadBufferSize) {
+      return deserializeToBatch();
     }
+    readNextRow();
   }
+
   return deserializeToBatch();
 }
 
 std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::deserializeToBatch() {
   ScopedTimer timer(&deserializeTime_);
-  std::vector<std::string_view> data;
-  data.reserve(std::min(cachedRows_, batchSize_));
-
-  uint32_t readRows = 0;
-  auto cur = cachedInputs_.begin();
-  while (readRows < batchSize_ && cur != cachedInputs_.end()) {
-    auto buffer = cur->second;
-    const auto* rawBuffer = buffer->as<char>();
-    while (rowOffset_ < cur->first && readRows < batchSize_) {
-      auto rowSize = *(reinterpret_cast<const RowSizeType*>(rawBuffer + 
byteOffset_)) - sizeof(RowSizeType);
-      byteOffset_ += sizeof(RowSizeType);
-      data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize));
-      byteOffset_ += rowSize;
-      ++rowOffset_;
-      ++readRows;
-    }
-    if (rowOffset_ == cur->first) {
-      rowOffset_ = 0;
-      byteOffset_ = 0;
-      ++cur;
-    }
-  }
-  cachedRows_ -= readRows;
-  auto rowVector = facebook::velox::row::CompactRow::deserialize(data, 
rowType_, veloxPool_);
-  // Free memory.
-  auto iter = cachedInputs_.begin();
-  while (iter++ != cur) {
-    cachedInputs_.pop_front();
-  }
+
+  auto rowVector = facebook::velox::row::CompactRow::deserialize(data_, 
rowType_, veloxPool_);
+
+  cachedRows_ = 0;
+  bytesRead_ = 0;
+  data_.resize(0);
   return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
 
-void 
VeloxSortShuffleReaderDeserializer::readLargeRow(std::vector<std::shared_ptr<arrow::Buffer>>&
 arrowBuffers) {
-  // Cache the read segment.
-  std::vector<std::shared_ptr<arrow::Buffer>> buffers;
-  auto rowSize = 
*reinterpret_cast<RowSizeType*>(const_cast<uint8_t*>(arrowBuffers[0]->data()));
-  RowSizeType bufferSize = arrowBuffers[0]->size();
-  buffers.emplace_back(std::move(arrowBuffers[0]));
-  // Read and cache the remaining segments.
-  uint32_t numRows;
-  while (bufferSize < rowSize) {
-    GLUTEN_ASSIGN_OR_THROW(
-        arrowBuffers,
-        BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, 
deserializeTime_, decompressTime_));
-    VELOX_DCHECK_EQ(numRows, 0);
-    bufferSize += arrowBuffers[0]->size();
-    buffers.emplace_back(std::move(arrowBuffers[0]));
-  }
-  VELOX_CHECK_EQ(bufferSize, rowSize);
-  // Merge all segments.
-  GLUTEN_ASSIGN_OR_THROW(std::shared_ptr<arrow::Buffer> rowBuffer, 
arrow::AllocateBuffer(rowSize, arrowPool_));
-  RowSizeType bytes = 0;
-  auto* dst = rowBuffer->mutable_data();
-  for (const auto& buffer : buffers) {
-    VELOX_DCHECK_NOT_NULL(buffer);
-    gluten::fastCopy(dst + bytes, buffer->data(), buffer->size());
-    bytes += buffer->size();
-  }
-  cachedInputs_.emplace_back(1, wrapInBufferViewAsOwner(rowBuffer->data(), 
rowSize, rowBuffer));
-  cachedRows_++;
+void VeloxSortShuffleReaderDeserializer::readNextRow() {
+  GLUTEN_THROW_NOT_OK(in_->Read(lastRowSize_, rowBufferPtr_ + bytesRead_));
+  data_.push_back(std::string_view(rowBufferPtr_ + bytesRead_, lastRowSize_));
+  bytesRead_ += lastRowSize_;
+  lastRowSize_ = 0;
+  ++cachedRows_;
 }
 
 class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public 
facebook::velox::GlutenByteInputStream {
@@ -705,5 +671,4 @@ int64_t VeloxShuffleReader::getDecompressTime() const {
 int64_t VeloxShuffleReader::getDeserializeTime() const {
   return factory_->getDeserializeTime();
 }
-
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index d7aa145ca1..29ca4d218b 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -79,29 +79,33 @@ class VeloxSortShuffleReaderDeserializer final : public 
ColumnarBatchIterator {
       int64_t& deserializeTime,
       int64_t& decompressTime);
 
+  ~VeloxSortShuffleReaderDeserializer() override;
+
   std::shared_ptr<ColumnarBatch> next() override;
 
  private:
   std::shared_ptr<ColumnarBatch> deserializeToBatch();
 
-  void readLargeRow(std::vector<std::shared_ptr<arrow::Buffer>>& arrowBuffers);
+  void readNextRow();
 
-  std::shared_ptr<arrow::io::InputStream> in_;
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::util::Codec> codec_;
   facebook::velox::RowTypePtr rowType_;
   uint32_t batchSize_;
-  arrow::MemoryPool* arrowPool_;
   facebook::velox::memory::MemoryPool* veloxPool_;
   int64_t& deserializeTime_;
   int64_t& decompressTime_;
 
-  std::list<std::pair<uint32_t, facebook::velox::BufferPtr>> cachedInputs_;
+  facebook::velox::BufferPtr rowBuffer_{nullptr};
+  char* rowBufferPtr_{nullptr};
+  uint32_t bytesRead_{0};
+  uint32_t lastRowSize_{0};
+  std::vector<std::string_view> data_;
+
+  std::shared_ptr<arrow::io::InputStream> in_;
+
   uint32_t cachedRows_{0};
   bool reachedEos_{false};
-
-  uint32_t rowOffset_{0};
-  size_t byteOffset_{0};
 };
 
 class VeloxRssSortShuffleReaderDeserializer : public ColumnarBatchIterator {
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 52a5240186..6c78752664 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -17,13 +17,13 @@
 
 #include "shuffle/VeloxSortShuffleWriter.h"
 
-#include <arrow/io/memory.h>
-
-#include "memory/ArrowMemory.h"
 #include "memory/VeloxColumnarBatch.h"
+#include "shuffle/RadixSort.h"
 #include "utils/Common.h"
 #include "utils/Timer.h"
 
+#include <arrow/io/memory.h>
+
 namespace gluten {
 namespace {
 
@@ -108,14 +108,8 @@ arrow::Status VeloxSortShuffleWriter::init() {
   // In Spark, sortedBuffer_ memory and compressionBuffer_ memory are 
pre-allocated and counted into executor
   // memory overhead. To align with Spark, we use arrow::default_memory_pool() 
to avoid counting these memory in Gluten.
   ARROW_ASSIGN_OR_RAISE(
-      sortedBuffer_, arrow::AllocateBuffer(options_.sortEvictBufferSize, 
arrow::default_memory_pool()));
-  rawBuffer_ = sortedBuffer_->mutable_data();
-  auto compressedBufferLength = partitionWriter_->getCompressedBufferLength(
-      {std::make_shared<arrow::Buffer>(rawBuffer_, 
options_.sortEvictBufferSize)});
-  if (compressedBufferLength.has_value()) {
-    ARROW_ASSIGN_OR_RAISE(
-        compressionBuffer_, arrow::AllocateBuffer(*compressedBufferLength, 
arrow::default_memory_pool()));
-  }
+      sortedBuffer_, arrow::AllocateBuffer(options_.diskWriteBufferSize, 
arrow::default_memory_pool()));
+  sortedBufferPtr_ = sortedBuffer_->mutable_data();
   return arrow::Status::OK();
 }
 
@@ -123,9 +117,6 @@ void VeloxSortShuffleWriter::initRowType(const 
facebook::velox::RowVectorPtr& rv
   if (UNLIKELY(!rowType_)) {
     rowType_ = facebook::velox::asRowType(rv->type());
     fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_);
-    if (fixedRowSize_) {
-      *fixedRowSize_ += sizeof(RowSizeType);
-    }
   }
 }
 
@@ -168,17 +159,16 @@ arrow::Status VeloxSortShuffleWriter::insert(const 
facebook::velox::RowVectorPtr
 
   facebook::velox::row::CompactRow row(vector);
 
-  if (!fixedRowSize_) {
+  if (fixedRowSize_.has_value()) {
+    rowSize_.resize(inputRows, fixedRowSize_.value() + sizeof(RowSizeType));
+  } else {
     rowSize_.resize(inputRows);
     rowSizePrefixSum_.resize(inputRows + 1);
     rowSizePrefixSum_[0] = 0;
     for (auto i = 0; i < inputRows; ++i) {
-      auto rowSize = row.rowSize(i) + sizeof(RowSizeType);
-      rowSize_[i] = rowSize;
-      rowSizePrefixSum_[i + 1] = rowSizePrefixSum_[i] + rowSize;
+      rowSize_[i] = row.rowSize(i);
+      rowSizePrefixSum_[i + 1] = rowSizePrefixSum_[i] + rowSize_[i] + 
sizeof(RowSizeType);
     }
-  } else {
-    rowSize_.resize(inputRows, *fixedRowSize_);
   }
 
   facebook::velox::vector_size_t rowOffset = 0;
@@ -186,7 +176,8 @@ arrow::Status VeloxSortShuffleWriter::insert(const 
facebook::velox::RowVectorPtr
     auto remainingRows = inputRows - rowOffset;
     auto rows = maxRowsToInsert(rowOffset, remainingRows);
     if (rows == 0) {
-      auto minSizeRequired = fixedRowSize_ ? fixedRowSize_.value() : 
rowSize_[rowOffset];
+      auto minSizeRequired =
+          (fixedRowSize_.has_value() ? fixedRowSize_.value() : 
rowSize_[rowOffset]) + sizeof(RowSizeType);
       acquireNewBuffer(static_cast<uint64_t>(memLimit), minSizeRequired);
       rows = maxRowsToInsert(rowOffset, remainingRows);
       ARROW_RETURN_IF(
@@ -215,7 +206,7 @@ void VeloxSortShuffleWriter::insertRows(
     // size(RowSize) | bytes
     memcpy(currentPage_ + pageCursor_, &rowSize_[row], sizeof(RowSizeType));
     offsets[i] = pageCursor_ + sizeof(RowSizeType);
-    pageCursor_ += rowSize_[row];
+    pageCursor_ += rowSize_[row] + sizeof(RowSizeType);
     VELOX_DCHECK_LE(pageCursor_, currenPageSize_);
   }
   compact.serialize(offset, size, offsets.data(), currentPage_);
@@ -288,61 +279,55 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
   // Serialize [begin, end)
   int64_t offset = 0;
   char* addr;
-  uint32_t size;
+  uint32_t recordSize;
 
   auto index = begin;
   while (index < end) {
     auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]);
     addr = pageAddresses_[pageIndex.first] + pageIndex.second;
-    size = *(reinterpret_cast<RowSizeType*>(addr));
-    if (offset + size > options_.sortEvictBufferSize && offset > 0) {
+    recordSize = *(reinterpret_cast<RowSizeType*>(addr)) + sizeof(RowSizeType);
+    if (offset + recordSize > options_.diskWriteBufferSize && offset > 0) {
       sortTime.stop();
-      RETURN_NOT_OK(evictPartitionInternal(partitionId, index - begin, 
rawBuffer_, offset));
+      RETURN_NOT_OK(evictPartitionInternal(partitionId, sortedBufferPtr_, 
offset));
       sortTime.start();
       begin = index;
       offset = 0;
     }
-    if (size > static_cast<uint32_t>(options_.sortEvictBufferSize)) {
+    if (recordSize > static_cast<uint32_t>(options_.diskWriteBufferSize)) {
       // Split large rows.
       sortTime.stop();
       RowSizeType bytes = 0;
       auto* buffer = reinterpret_cast<uint8_t*>(addr);
-      while (bytes < size) {
-        auto rawLength = 
std::min<RowSizeType>((uint32_t)options_.sortEvictBufferSize, size - bytes);
+      while (bytes < recordSize) {
+        auto rawLength = 
std::min<RowSizeType>((uint32_t)options_.diskWriteBufferSize, recordSize - 
bytes);
         // Use numRows = 0 to represent a part of row.
-        RETURN_NOT_OK(evictPartitionInternal(partitionId, 0, buffer + bytes, 
rawLength));
+        RETURN_NOT_OK(evictPartitionInternal(partitionId, buffer + bytes, 
rawLength));
         bytes += rawLength;
       }
       begin++;
       sortTime.start();
     } else {
       // Copy small rows.
-      gluten::fastCopy(rawBuffer_ + offset, addr, size);
-      offset += size;
+      gluten::fastCopy(sortedBufferPtr_ + offset, addr, recordSize);
+      offset += recordSize;
     }
     index++;
   }
   sortTime.stop();
   if (offset > 0) {
     VELOX_CHECK(index > begin);
-    RETURN_NOT_OK(evictPartitionInternal(partitionId, index - begin, 
rawBuffer_, offset));
+    RETURN_NOT_OK(evictPartitionInternal(partitionId, sortedBufferPtr_, 
offset));
   }
   sortTime_ += sortTime.realTimeUsed();
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(
-    uint32_t partitionId,
-    int32_t numRows,
-    uint8_t* buffer,
-    int64_t rawLength) {
+arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(uint32_t 
partitionId, uint8_t* buffer, int64_t rawLength) {
   VELOX_CHECK(rawLength > 0);
   auto payload = std::make_unique<InMemoryPayload>(
-      numRows,
-      nullptr,
-      
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
 rawLength)});
+      0, nullptr, 
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
 rawLength)});
   updateSpillMetrics(payload);
-  RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), 
compressionBuffer_, stopped_));
+  RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), 
stopped_));
   return arrow::Status::OK();
 }
 
@@ -354,9 +339,10 @@ facebook::velox::vector_size_t 
VeloxSortShuffleWriter::maxRowsToInsert(
     return 0;
   }
   auto remainingBytes = pages_.back()->size() - pageCursor_;
-  if (fixedRowSize_) {
+  if (fixedRowSize_.has_value()) {
     return std::min(
-        static_cast<facebook::velox::vector_size_t>(remainingBytes / 
(fixedRowSize_.value())), remainingRows);
+        static_cast<facebook::velox::vector_size_t>(remainingBytes / 
(fixedRowSize_.value() + sizeof(RowSizeType))),
+        remainingRows);
   }
   auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
   auto bytesWritten = rowSizePrefixSum_[offset];
@@ -445,7 +431,7 @@ int64_t VeloxSortShuffleWriter::totalC2RTime() const {
 
 void VeloxSortShuffleWriter::allocateMinimalArray() {
   auto array = facebook::velox::AlignedBuffer::allocate<char>(
-      options_.sortBufferInitialSize * sizeof(uint64_t), veloxPool_.get());
+      options_.initialSortBufferSize * sizeof(uint64_t), veloxPool_.get());
   setUpArray(std::move(array));
 }
 
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 42726a663b..cb8b3ba557 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -17,16 +17,13 @@
 
 #pragma once
 
-#include "shuffle/RadixSort.h"
 #include "shuffle/VeloxShuffleWriter.h"
 
 #include <arrow/status.h>
-#include <map>
 #include <vector>
 
 #include "velox/common/memory/HashStringAllocator.h"
 #include "velox/row/CompactRow.h"
-#include "velox/vector/BaseVector.h"
 
 namespace gluten {
 
@@ -80,7 +77,7 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);
 
-  arrow::Status evictPartitionInternal(uint32_t partitionId, int32_t numRows, 
uint8_t* buffer, int64_t rawLength);
+  arrow::Status evictPartitionInternal(uint32_t partitionId, uint8_t* buffer, 
int64_t rawLength);
 
   facebook::velox::vector_size_t maxRowsToInsert(
       facebook::velox::vector_size_t offset,
@@ -113,8 +110,7 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
   uint32_t currenPageSize_;
 
   std::unique_ptr<arrow::Buffer> sortedBuffer_;
-  uint8_t* rawBuffer_;
-  std::shared_ptr<arrow::Buffer> compressionBuffer_{nullptr};
+  uint8_t* sortedBufferPtr_;
 
   // Row ID -> Partition ID
   // subscript: The index of row in the current input RowVector
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index e760a469b1..fac503ca70 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -69,17 +69,20 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
   std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
 
   for (const auto& compression : compressions) {
-    for (const auto compressionBufferSize : {4, 56, 32 * 1024}) {
-      for (auto useRadixSort : {true, false}) {
-        params.push_back(ShuffleTestParams{
-            .shuffleWriterType = ShuffleWriterType::kSortShuffle,
-            .partitionWriterType = PartitionWriterType::kLocal,
-            .compressionType = compression,
-            .compressionBufferSize = compressionBufferSize,
-            .useRadixSort = useRadixSort});
+    for (const auto partitionWriterType : {PartitionWriterType::kLocal, 
PartitionWriterType::kRss}) {
+      for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) {
+        for (auto useRadixSort : {true, false}) {
+          params.push_back(ShuffleTestParams{
+              .shuffleWriterType = ShuffleWriterType::kSortShuffle,
+              .partitionWriterType = partitionWriterType,
+              .compressionType = compression,
+              .diskWriteBufferSize = diskWriteBufferSize,
+              .useRadixSort = useRadixSort});
+        }
       }
     }
     params.push_back(ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, 
PartitionWriterType::kRss, compression});
+
     for (const auto compressionThreshold : compressionThresholds) {
       for (const auto mergeBufferSize : mergeBufferSizes) {
         params.push_back(ShuffleTestParams{
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index 4fcab8f242..7a9c6c8971 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -68,14 +68,14 @@ struct ShuffleTestParams {
   arrow::Compression::type compressionType;
   int32_t compressionThreshold{0};
   int32_t mergeBufferSize{0};
-  int32_t compressionBufferSize{0};
+  int32_t diskWriteBufferSize{0};
   bool useRadixSort{false};
 
   std::string toString() const {
     std::ostringstream out;
     out << "shuffleWriterType = " << shuffleWriterType << ", 
partitionWriterType = " << partitionWriterType
         << ", compressionType = " << compressionType << ", 
compressionThreshold = " << compressionThreshold
-        << ", mergeBufferSize = " << mergeBufferSize << ", 
compressionBufferSize = " << compressionBufferSize
+        << ", mergeBufferSize = " << mergeBufferSize << ", 
compressionBufferSize = " << diskWriteBufferSize
         << ", useRadixSort = " << (useRadixSort ? "true" : "false");
     return out.str();
   }
@@ -261,7 +261,7 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
 
     ShuffleTestParams params = GetParam();
     shuffleWriterOptions_.useRadixSort = params.useRadixSort;
-    shuffleWriterOptions_.sortEvictBufferSize = params.compressionBufferSize;
+    shuffleWriterOptions_.diskWriteBufferSize = params.diskWriteBufferSize;
     partitionWriterOptions_.compressionType = params.compressionType;
     switch (partitionWriterOptions_.compressionType) {
       case arrow::Compression::UNCOMPRESSED:
@@ -365,7 +365,7 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
         std::move(codec),
         veloxCompressionType,
         rowType,
-        std::numeric_limits<int32_t>::max(),
+        kDefaultBatchSize,
         kDefaultReadBufferSize,
         defaultArrowMemoryPool().get(),
         pool_,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to