This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0120d6af9d [GLUTEN-8855][VL] Columnar shuffle code cleanup (#9693)
0120d6af9d is described below

commit 0120d6af9d7a84d0353bbae104c7f8ddcb710862
Author: Rong Ma <[email protected]>
AuthorDate: Thu May 22 15:31:23 2025 +0800

    [GLUTEN-8855][VL] Columnar shuffle code cleanup (#9693)
---
 cpp/core/jni/JniWrapper.cc                     |  17 +-
 cpp/core/shuffle/LocalPartitionWriter.cc       | 346 ++++++++++++-------------
 cpp/core/shuffle/LocalPartitionWriter.h        |   4 +-
 cpp/core/shuffle/Options.h                     |   5 -
 cpp/core/shuffle/PartitionWriter.h             |  16 +-
 cpp/core/shuffle/Payload.cc                    | 125 ++++-----
 cpp/core/shuffle/Payload.h                     |  36 ++-
 cpp/core/shuffle/Spill.cc                      |   1 -
 cpp/core/shuffle/Utils.cc                      |   7 +
 cpp/core/shuffle/Utils.h                       |  21 +-
 cpp/core/shuffle/rss/RssPartitionWriter.cc     |   3 +-
 cpp/core/shuffle/rss/RssPartitionWriter.h      |   4 +-
 cpp/velox/benchmarks/GenericBenchmark.cc       |  21 +-
 cpp/velox/shuffle/VeloxHashShuffleWriter.cc    |  32 +--
 cpp/velox/shuffle/VeloxHashShuffleWriter.h     |  15 --
 cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc |   3 +-
 cpp/velox/shuffle/VeloxShuffleReader.cc        |  12 +-
 cpp/velox/shuffle/VeloxShuffleReader.h         |   2 -
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc    |   5 +-
 cpp/velox/tests/VeloxShuffleWriterSpillTest.cc |   4 +-
 cpp/velox/tests/VeloxShuffleWriterTest.cc      | 277 +++++++++-----------
 cpp/velox/tests/VeloxShuffleWriterTestBase.h   |  21 +-
 22 files changed, 466 insertions(+), 511 deletions(-)

diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index c4ba4d865c..1321b6041f 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -832,7 +832,6 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
       .compressionThreshold = compressionThreshold,
       .compressionType = getCompressionType(env, codecJstr),
       .compressionLevel = compressionLevel,
-      .bufferedWrite = true,
       .numSubDirs = numSubDirs,
       .pushBufferMaxSize = pushBufferMaxSize > 0 ? pushBufferMaxSize : 
kDefaultPushMemoryThreshold,
       .sortBufferMaxSize = sortBufferMaxSize > 0 ? sortBufferMaxSize : 
kDefaultSortBufferThreshold};
@@ -870,11 +869,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
     env->ReleaseStringUTFChars(localDirsJstr, localDirsC);
 
     partitionWriter = std::make_unique<LocalPartitionWriter>(
-        numPartitions,
-        std::move(partitionWriterOptions),
-        ctx->memoryManager()->getArrowMemoryPool(),
-        dataFile,
-        configuredDirs);
+        numPartitions, std::move(partitionWriterOptions), 
ctx->memoryManager(), dataFile, configuredDirs);
   } else if (partitionWriterType == "celeborn") {
     jclass celebornPartitionPusherClass =
         createGlobalClassReferenceOrError(env, 
"Lorg/apache/spark/shuffle/CelebornPartitionPusher;");
@@ -887,10 +882,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
     std::shared_ptr<JavaRssClient> celebornClient =
         std::make_shared<JavaRssClient>(vm, partitionPusher, 
celebornPushPartitionDataMethod);
     partitionWriter = std::make_unique<RssPartitionWriter>(
-        numPartitions,
-        std::move(partitionWriterOptions),
-        ctx->memoryManager()->getArrowMemoryPool(),
-        std::move(celebornClient));
+        numPartitions, std::move(partitionWriterOptions), 
ctx->memoryManager(), std::move(celebornClient));
   } else if (partitionWriterType == "uniffle") {
     jclass unifflePartitionPusherClass =
         createGlobalClassReferenceOrError(env, 
"Lorg/apache/spark/shuffle/writer/PartitionPusher;");
@@ -903,10 +895,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
     std::shared_ptr<JavaRssClient> uniffleClient =
         std::make_shared<JavaRssClient>(vm, partitionPusher, 
unifflePushPartitionDataMethod);
     partitionWriter = std::make_unique<RssPartitionWriter>(
-        numPartitions,
-        std::move(partitionWriterOptions),
-        ctx->memoryManager()->getArrowMemoryPool(),
-        std::move(uniffleClient));
+        numPartitions, std::move(partitionWriterOptions), 
ctx->memoryManager(), std::move(uniffleClient));
   } else {
     throw GlutenException("Unrecognizable partition writer type: " + 
partitionWriterType);
   }
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc 
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 8cd328b13f..a8f6629af8 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -31,6 +31,22 @@
 #include <thread>
 
 namespace gluten {
+
+namespace {
+arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const 
std::string& file, int64_t bufferSize) {
+  std::shared_ptr<arrow::io::FileOutputStream> out;
+  const auto fd = open(file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0000);
+  // Set the shuffle file permissions to 0644 to keep it consistent with the 
permissions of
+  // the built-in shuffler manager in Spark.
+  fchmod(fd, 0644);
+  ARROW_ASSIGN_OR_RAISE(out, arrow::io::FileOutputStream::Open(fd));
+
+  // The `shuffleFileBufferSize` bytes is a temporary allocation and will be 
freed with file close.
+  // Use default memory pool and count treat the memory as executor memory 
overhead to avoid unnecessary spill.
+  return arrow::io::BufferedOutputStream::Create(bufferSize, 
arrow::default_memory_pool(), out);
+}
+} // namespace
+
 class LocalPartitionWriter::LocalSpiller {
  public:
   LocalSpiller(
@@ -38,13 +54,11 @@ class LocalPartitionWriter::LocalSpiller {
       std::shared_ptr<arrow::io::OutputStream> os,
       std::string spillFile,
       int32_t compressionBufferSize,
-      int32_t compressionThreshold,
       arrow::MemoryPool* pool,
       arrow::util::Codec* codec)
       : isFinal_(isFinal),
         os_(os),
         spillFile_(std::move(spillFile)),
-        compressionThreshold_(compressionThreshold),
         pool_(pool),
         codec_(codec),
         diskSpill_(std::make_unique<Spill>()) {
@@ -67,8 +81,10 @@ class LocalPartitionWriter::LocalSpiller {
     ARROW_ASSIGN_OR_RAISE(const auto pos, os_->Tell());
 
     diskSpill_->insertPayload(lastPid_, Payload::kRaw, 0, nullptr, pos - 
writePos_, pool_, nullptr);
+
     DLOG(INFO) << "LocalSpiller: Spilled partition " << lastPid_ << " file 
start: " << writePos_
                << ", file end: " << pos << ", file: " << spillFile_;
+
     return arrow::Status::OK();
   }
 
@@ -89,24 +105,20 @@ class LocalPartitionWriter::LocalSpiller {
   }
 
   arrow::Status spill(uint32_t partitionId, std::unique_ptr<BlockPayload> 
payload) {
-    // Check spill Type.
-    ARROW_RETURN_IF(
-        payload->type() == Payload::kToBeCompressed,
-        arrow::Status::Invalid("Cannot spill payload of type: " + 
payload->toString()));
     ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell());
     RETURN_NOT_OK(payload->serialize(os_.get()));
-    compressTime_ += payload->getCompressTime();
-    spillTime_ += payload->getWriteTime();
+
     ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
+
     DLOG(INFO) << "LocalSpiller: Spilled partition " << partitionId << " file 
start: " << start << ", file end: " << end
                << ", file: " << spillFile_;
 
-    auto payloadType = payload->type();
-    if (payloadType == Payload::kUncompressed && codec_ != nullptr && 
payload->numRows() >= compressionThreshold_) {
-      payloadType = Payload::kToBeCompressed;
-    }
+    compressTime_ += payload->getCompressTime();
+    spillTime_ += payload->getWriteTime();
+
     diskSpill_->insertPayload(
-        partitionId, payloadType, payload->numRows(), 
payload->isValidityBuffer(), end - start, pool_, codec_);
+        partitionId, payload->type(), payload->numRows(), 
payload->isValidityBuffer(), end - start, pool_, codec_);
+
     return arrow::Status::OK();
   }
 
@@ -153,7 +165,6 @@ class LocalPartitionWriter::LocalSpiller {
   int64_t writePos_{0};
 
   std::string spillFile_;
-  int32_t compressionThreshold_;
   arrow::MemoryPool* pool_;
   arrow::util::Codec* codec_;
 
@@ -175,18 +186,21 @@ class LocalPartitionWriter::PayloadMerger {
         mergeBufferSize_(options.mergeBufferSize),
         mergeBufferMinSize_(options.mergeBufferSize * options.mergeThreshold) 
{}
 
-  arrow::Result<std::vector<std::unique_ptr<BlockPayload>>>
+  arrow::Result<std::vector<std::unique_ptr<InMemoryPayload>>>
   merge(uint32_t partitionId, std::unique_ptr<InMemoryPayload> append, bool 
reuseBuffers) {
-    std::vector<std::unique_ptr<BlockPayload>> merged{};
+    PartitionScopeGuard mergeGuard(partitionInUse_, partitionId);
+
+    std::vector<std::unique_ptr<InMemoryPayload>> merged{};
     if (!append->mergeable()) {
       // TODO: Merging complex type is currently not supported.
-      merged.emplace_back();
-      ARROW_ASSIGN_OR_RAISE(merged.back(), 
createBlockPayload(std::move(append), reuseBuffers));
+      bool shouldCompress = codec_ != nullptr && append->numRows() >= 
compressionThreshold_;
+      if (reuseBuffers && !shouldCompress) {
+        RETURN_NOT_OK(append->copyBuffers(pool_));
+      }
+      merged.emplace_back(std::move(append));
       return merged;
     }
 
-    MergeGuard mergeGuard(partitionInMerge_, partitionId);
-
     auto cacheOrFinish = [&]() {
       if (append->numRows() <= mergeBufferMinSize_) {
         // Save for merge.
@@ -197,9 +211,12 @@ class LocalPartitionWriter::PayloadMerger {
         partitionMergePayload_[partitionId] = std::move(append);
         return arrow::Status::OK();
       }
-      merged.emplace_back();
-      // If current buffer rows reaches merging threshold, create BlockPayload.
-      ARROW_ASSIGN_OR_RAISE(merged.back(), 
createBlockPayload(std::move(append), reuseBuffers));
+      // Commit if current buffer rows reaches merging threshold.
+      bool shouldCompress = codec_ != nullptr && append->numRows() >= 
compressionThreshold_;
+      if (reuseBuffers && !shouldCompress) {
+        RETURN_NOT_OK(append->copyBuffers(pool_));
+      }
+      merged.emplace_back(std::move(append));
       return arrow::Status::OK();
     };
 
@@ -211,14 +228,7 @@ class LocalPartitionWriter::PayloadMerger {
     auto lastPayload = std::move(partitionMergePayload_[partitionId]);
     auto mergedRows = append->numRows() + lastPayload->numRows();
     if (mergedRows > mergeBufferSize_ || append->numRows() > 
mergeBufferMinSize_) {
-      merged.emplace_back();
-      ARROW_ASSIGN_OR_RAISE(
-          merged.back(),
-          lastPayload->toBlockPayload(
-              codec_ != nullptr && lastPayload->numRows() >= 
compressionThreshold_ ? Payload::kCompressed
-                                                                               
    : Payload::kUncompressed,
-              pool_,
-              codec_));
+      merged.emplace_back(std::move(lastPayload));
       RETURN_NOT_OK(cacheOrFinish());
       return merged;
     }
@@ -232,40 +242,26 @@ class LocalPartitionWriter::PayloadMerger {
       partitionMergePayload_[partitionId] = std::move(payload);
       return merged;
     }
+
     // mergedRows == mergeBufferSize_
-    merged.emplace_back();
-    ARROW_ASSIGN_OR_RAISE(
-        merged.back(),
-        payload->toBlockPayload(
-            codec_ != nullptr && payload->numRows() >= compressionThreshold_ ? 
Payload::kCompressed
-                                                                             : 
Payload::kUncompressed,
-            pool_,
-            codec_));
+    merged.emplace_back(std::move(payload));
     return merged;
   }
 
-  arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finishForSpill(
-      uint32_t partitionId,
-      int64_t& totalBytesToEvict) {
+  arrow::Result<std::optional<std::unique_ptr<InMemoryPayload>>> 
finish(uint32_t partitionId, bool fromSpill) {
     // We need to check whether the spill source is from compressing/copying 
the merged buffers.
-    if ((partitionInMerge_.has_value() && *partitionInMerge_ == partitionId) 
|| !hasMerged(partitionId)) {
+    if ((fromSpill && (partitionInUse_.has_value() && partitionInUse_.value() 
== partitionId)) ||
+        !hasMerged(partitionId)) {
       return std::nullopt;
     }
-    auto payload = std::move(partitionMergePayload_[partitionId]);
-    totalBytesToEvict += payload->rawSize();
-    return payload->toBlockPayload(Payload::kUncompressed, pool_, codec_);
-  }
 
-  arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finish(uint32_t 
partitionId) {
-    if (!hasMerged(partitionId)) {
-      return std::nullopt;
+    if (!fromSpill) {
+      GLUTEN_DCHECK(
+          !partitionInUse_.has_value(),
+          "Invalid status: partitionInUse_ is set when not in spilling: " + 
std::to_string(partitionInUse_.value()));
     }
-    auto numRows = partitionMergePayload_[partitionId]->numRows();
-    // Because this is the last BlockPayload, delay the compression before 
writing to the final data file.
-    auto payloadType =
-        (codec_ != nullptr && numRows >= compressionThreshold_) ? 
Payload::kToBeCompressed : Payload::kUncompressed;
-    auto payload = std::move(partitionMergePayload_[partitionId]);
-    return payload->toBlockPayload(payloadType, pool_, codec_);
+
+    return std::move(partitionMergePayload_[partitionId]);
   }
 
   bool hasMerged(uint32_t partitionId) {
@@ -280,81 +276,45 @@ class LocalPartitionWriter::PayloadMerger {
   int32_t mergeBufferSize_;
   int32_t mergeBufferMinSize_;
   std::unordered_map<uint32_t, std::unique_ptr<InMemoryPayload>> 
partitionMergePayload_;
-  std::optional<uint32_t> partitionInMerge_;
-
-  class MergeGuard {
-   public:
-    MergeGuard(std::optional<uint32_t>& partitionInMerge, uint32_t 
partitionId) : partitionInMerge_(partitionInMerge) {
-      partitionInMerge_ = partitionId;
-    }
-
-    ~MergeGuard() {
-      partitionInMerge_ = std::nullopt;
-    }
-
-   private:
-    std::optional<uint32_t>& partitionInMerge_;
-  };
-
-  arrow::Status copyBuffers(std::vector<std::shared_ptr<arrow::Buffer>>& 
buffers) {
-    // Copy.
-    std::vector<std::shared_ptr<arrow::Buffer>> copies;
-    for (auto& buffer : buffers) {
-      if (!buffer) {
-        continue;
-      }
-      if (buffer->size() == 0) {
-        buffer = zeroLengthNullBuffer();
-        continue;
-      }
-      ARROW_ASSIGN_OR_RAISE(auto copy, 
arrow::AllocateResizableBuffer(buffer->size(), pool_));
-      memcpy(copy->mutable_data(), buffer->data(), buffer->size());
-      buffer = std::move(copy);
-    }
-    return arrow::Status::OK();
-  }
-
-  arrow::Result<std::unique_ptr<BlockPayload>> createBlockPayload(
-      std::unique_ptr<InMemoryPayload> inMemoryPayload,
-      bool reuseBuffers) {
-    auto createCompressed = codec_ != nullptr && inMemoryPayload->numRows() >= 
compressionThreshold_;
-    if (reuseBuffers && !createCompressed) {
-      // For uncompressed buffers, need to copy before caching.
-      RETURN_NOT_OK(inMemoryPayload->copyBuffers(pool_));
-    }
-    ARROW_ASSIGN_OR_RAISE(
-        auto payload,
-        inMemoryPayload->toBlockPayload(
-            createCompressed ? Payload::kCompressed : Payload::kUncompressed, 
pool_, codec_));
-    return payload;
-  }
+  std::optional<uint32_t> partitionInUse_{std::nullopt};
 };
 
 class LocalPartitionWriter::PayloadCache {
  public:
-  PayloadCache(uint32_t numPartitions) : numPartitions_(numPartitions) {}
+  PayloadCache(uint32_t numPartitions, arrow::util::Codec* codec, int32_t 
compressionThreshold, arrow::MemoryPool* pool)
+      : numPartitions_(numPartitions), codec_(codec), 
compressionThreshold_(compressionThreshold), pool_(pool) {}
+
+  arrow::Status cache(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
payload) {
+    PartitionScopeGuard cacheGuard(partitionInUse_, partitionId);
 
-  arrow::Status cache(uint32_t partitionId, std::unique_ptr<BlockPayload> 
payload) {
     if (partitionCachedPayload_.find(partitionId) == 
partitionCachedPayload_.end()) {
       partitionCachedPayload_[partitionId] = 
std::list<std::unique_ptr<BlockPayload>>{};
     }
-    partitionCachedPayload_[partitionId].push_back(std::move(payload));
-    return arrow::Status::OK();
-  }
 
-  bool hasCachedPayloads(uint32_t partitionId) {
-    return partitionCachedPayload_.find(partitionId) != 
partitionCachedPayload_.end() &&
-        !partitionCachedPayload_[partitionId].empty();
+    bool shouldCompress = codec_ != nullptr && payload->numRows() >= 
compressionThreshold_;
+    ARROW_ASSIGN_OR_RAISE(
+        auto block,
+        payload->toBlockPayload(shouldCompress ? Payload::kCompressed : 
Payload::kUncompressed, pool_, codec_));
+
+    partitionCachedPayload_[partitionId].push_back(std::move(block));
+
+    return arrow::Status::OK();
   }
 
   arrow::Status write(uint32_t partitionId, arrow::io::OutputStream* os) {
+    GLUTEN_DCHECK(
+        !partitionInUse_.has_value(),
+        "Invalid status: partitionInUse_ is set: " + 
std::to_string(partitionInUse_.value()));
+
     if (hasCachedPayloads(partitionId)) {
       auto& payloads = partitionCachedPayload_[partitionId];
       while (!payloads.empty()) {
         auto payload = std::move(payloads.front());
         payloads.pop_front();
+
         // Write the cached payload to disk.
         RETURN_NOT_OK(payload->serialize(os));
+
         compressTime_ += payload->getCompressTime();
         writeTime_ += payload->getWriteTime();
       }
@@ -364,6 +324,9 @@ class LocalPartitionWriter::PayloadCache {
 
   bool canSpill() {
     for (auto pid = 0; pid < numPartitions_; ++pid) {
+      if (partitionInUse_.has_value() && partitionInUse_.value() == pid) {
+        continue;
+      }
       if (hasCachedPayloads(pid)) {
         return true;
       }
@@ -371,40 +334,49 @@ class LocalPartitionWriter::PayloadCache {
     return false;
   }
 
-  arrow::Result<std::shared_ptr<Spill>> spillAndClose(
-      std::shared_ptr<arrow::io::OutputStream> os,
+  arrow::Result<std::shared_ptr<Spill>> spill(
       const std::string& spillFile,
       arrow::MemoryPool* pool,
       arrow::util::Codec* codec,
+      const int64_t bufferSize,
       int64_t& totalBytesToEvict) {
-    std::shared_ptr<Spill> diskSpill = nullptr;
-    ARROW_ASSIGN_OR_RAISE(auto start, os->Tell());
+    ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile, bufferSize));
+
+    int64_t start = 0;
+    auto diskSpill = std::make_shared<Spill>();
+
     for (uint32_t pid = 0; pid < numPartitions_; ++pid) {
+      if (partitionInUse_.has_value() && partitionInUse_.value() == pid) {
+        continue;
+      }
+
       if (hasCachedPayloads(pid)) {
         auto& payloads = partitionCachedPayload_[pid];
         while (!payloads.empty()) {
           auto payload = std::move(payloads.front());
           payloads.pop_front();
           totalBytesToEvict += payload->rawSize();
+
           // Spill the cached payload to disk.
           RETURN_NOT_OK(payload->serialize(os.get()));
           compressTime_ += payload->getCompressTime();
           spillTime_ += payload->getWriteTime();
-
-          if (UNLIKELY(!diskSpill)) {
-            diskSpill = std::make_unique<Spill>();
-          }
-          ARROW_ASSIGN_OR_RAISE(auto end, os->Tell());
-          DLOG(INFO) << "PayloadCache: Spilled partition " << pid << " file 
start: " << start << ", file end: " << end
-                     << ", file: " << spillFile;
-          diskSpill->insertPayload(
-              pid, payload->type(), payload->numRows(), 
payload->isValidityBuffer(), end - start, pool, codec);
-          start = end;
         }
+
+        ARROW_ASSIGN_OR_RAISE(auto end, os->Tell());
+
+        diskSpill->insertPayload(pid, Payload::kRaw, 0, nullptr, end - start, 
pool, codec);
+
+        DLOG(INFO) << "PayloadCache: Spilled partition " << pid << " file 
start: " << start << ", file end: " << end
+                   << ", file: " << spillFile;
+
+        start = end;
       }
     }
+
     RETURN_NOT_OK(os->Close());
     diskSpill->setSpillFile(spillFile);
+
     return diskSpill;
   }
 
@@ -421,20 +393,31 @@ class LocalPartitionWriter::PayloadCache {
   }
 
  private:
+  bool hasCachedPayloads(uint32_t partitionId) {
+    return partitionCachedPayload_.find(partitionId) != 
partitionCachedPayload_.end() &&
+        !partitionCachedPayload_[partitionId].empty();
+  }
+
   uint32_t numPartitions_;
+  arrow::util::Codec* codec_;
+  int32_t compressionThreshold_;
+  arrow::MemoryPool* pool_;
+
   int64_t compressTime_{0};
   int64_t spillTime_{0};
   int64_t writeTime_{0};
   std::unordered_map<uint32_t, std::list<std::unique_ptr<BlockPayload>>> 
partitionCachedPayload_;
+
+  std::optional<uint32_t> partitionInUse_{std::nullopt};
 };
 
 LocalPartitionWriter::LocalPartitionWriter(
     uint32_t numPartitions,
     PartitionWriterOptions options,
-    arrow::MemoryPool* pool,
+    MemoryManager* memoryManager,
     const std::string& dataFile,
     const std::vector<std::string>& localDirs)
-    : PartitionWriter(numPartitions, std::move(options), pool), 
dataFile_(dataFile), localDirs_(localDirs) {
+    : PartitionWriter(numPartitions, std::move(options), memoryManager), 
dataFile_(dataFile), localDirs_(localDirs) {
   init();
 }
 
@@ -445,21 +428,6 @@ std::string LocalPartitionWriter::nextSpilledFileDir() {
   return spilledFileDir;
 }
 
-arrow::Result<std::shared_ptr<arrow::io::OutputStream>> 
LocalPartitionWriter::openFile(const std::string& file) {
-  std::shared_ptr<arrow::io::FileOutputStream> fout;
-  auto fd = open(file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0000);
-  // Set the shuffle file permissions to 0644 to keep it consistent with the 
permissions of
-  // the built-in shuffler manager in Spark.
-  fchmod(fd, 0644);
-  ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(fd));
-  if (options_.bufferedWrite) {
-    // The `shuffleFileBufferSize` bytes is a temporary allocation and will be 
freed with file close.
-    // Use default memory pool and count treat the memory as executor memory 
overhead to avoid unnecessary spill.
-    return 
arrow::io::BufferedOutputStream::Create(options_.shuffleFileBufferSize, 
arrow::default_memory_pool(), fout);
-  }
-  return fout;
-}
-
 arrow::Status LocalPartitionWriter::clearResource() {
   RETURN_NOT_OK(dataFileOs_->Close());
   // When bufferedWrite = true, dataFileOs_->Close doesn't release underlying 
buffer.
@@ -490,7 +458,6 @@ arrow::Result<int64_t> 
LocalPartitionWriter::mergeSpills(uint32_t partitionId) {
 
     // Read if partition exists in the spilled file. Then write to the final 
data file.
     while (auto payload = spill->nextPayload(partitionId)) {
-      // May trigger spill during compression.
       RETURN_NOT_OK(payload->serialize(dataFileOs_.get()));
       compressTime_ += payload->getCompressTime();
       writeTime_ += payload->getWriteTime();
@@ -535,7 +502,24 @@ arrow::Status 
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
     compressTime_ += spill->compressTime();
   } else {
     RETURN_NOT_OK(finishSpill());
-    ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
+
+    if (merger_) {
+      for (auto pid = 0; pid < numPartitions_; ++pid) {
+        ARROW_ASSIGN_OR_RAISE(auto maybeMerged, merger_->finish(pid, false));
+        if (maybeMerged.has_value()) {
+          if (!payloadCache_) {
+            payloadCache_ = std::make_shared<PayloadCache>(
+                numPartitions_, codec_.get(), options_.compressionThreshold, 
payloadPool_.get());
+          }
+          // Spill can be triggered by compressing or building dictionaries.
+          RETURN_NOT_OK(payloadCache_->cache(pid, 
std::move(maybeMerged.value())));
+        }
+      }
+
+      merger_.reset();
+    }
+
+    ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_, 
options_.shuffleFileBufferSize));
 
     int64_t endInFinalFile = 0;
     DLOG(INFO) << "LocalPartitionWriter stopped. Total spills: " << 
spills_.size();
@@ -544,20 +528,13 @@ arrow::Status 
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
       // Record start offset.
       auto startInFinalFile = endInFinalFile;
       // Iterator over all spilled files.
-      // Reading and compressing toBeCompressed payload can trigger spill.
+      // May trigger spill during compression.
       RETURN_NOT_OK(mergeSpills(pid));
-      if (payloadCache_ && payloadCache_->hasCachedPayloads(pid)) {
+
+      if (payloadCache_) {
         RETURN_NOT_OK(payloadCache_->write(pid, dataFileOs_.get()));
       }
-      if (merger_) {
-        ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finish(pid));
-        if (merged) {
-          // Compressing merged payload can trigger spill.
-          RETURN_NOT_OK((*merged)->serialize(dataFileOs_.get()));
-          compressTime_ += (*merged)->getCompressTime();
-          writeTime_ += (*merged)->getWriteTime();
-        }
-      }
+
       ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell());
       partitionLengths_[pid] = endInFinalFile - startInFinalFile;
     }
@@ -593,22 +570,16 @@ arrow::Status LocalPartitionWriter::requestSpill(bool 
isFinal) {
     std::shared_ptr<arrow::io::OutputStream> os;
     if (isFinal) {
       // If `spill()` is requested after `stop()`, open the final data file 
for writing.
-      ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
+      ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_, 
options_.shuffleFileBufferSize));
       spillFile = dataFile_;
       os = dataFileOs_;
       useSpillFileAsDataFile_ = true;
     } else {
       ARROW_ASSIGN_OR_RAISE(spillFile, 
createTempShuffleFile(nextSpilledFileDir()));
-      ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile));
+      ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile, 
options_.shuffleFileBufferSize));
     }
     spiller_ = std::make_unique<LocalSpiller>(
-        isFinal,
-        os,
-        std::move(spillFile),
-        options_.compressionBufferSize,
-        options_.compressionThreshold,
-        payloadPool_.get(),
-        codec_.get());
+        isFinal, os, std::move(spillFile), options_.compressionBufferSize, 
payloadPool_.get(), codec_.get());
   }
   return arrow::Status::OK();
 }
@@ -631,19 +602,25 @@ arrow::Status LocalPartitionWriter::hashEvict(
 
   if (evictType == Evict::kSpill) {
     RETURN_NOT_OK(requestSpill(false));
+
+    auto shouldCompress = codec_ != nullptr && inMemoryPayload->numRows() >= 
options_.compressionThreshold;
     ARROW_ASSIGN_OR_RAISE(
-        auto payload, inMemoryPayload->toBlockPayload(Payload::kUncompressed, 
payloadPool_.get(), nullptr));
+        auto payload,
+        inMemoryPayload->toBlockPayload(
+            shouldCompress ? Payload::kToBeCompressed : 
Payload::kUncompressed, payloadPool_.get(), codec_.get()));
+
     RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
     return arrow::Status::OK();
   }
 
   if (!merger_) {
-    merger_ = std::make_shared<PayloadMerger>(options_, payloadPool_.get(), 
codec_ ? codec_.get() : nullptr);
+    merger_ = std::make_shared<PayloadMerger>(options_, payloadPool_.get(), 
codec_.get());
   }
   ARROW_ASSIGN_OR_RAISE(auto merged, merger_->merge(partitionId, 
std::move(inMemoryPayload), reuseBuffers));
   if (!merged.empty()) {
     if (UNLIKELY(!payloadCache_)) {
-      payloadCache_ = std::make_shared<PayloadCache>(numPartitions_);
+      payloadCache_ = std::make_shared<PayloadCache>(
+          numPartitions_, codec_.get(), options_.compressionThreshold, 
payloadPool_.get());
     }
     for (auto& payload : merged) {
       RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
@@ -690,33 +667,48 @@ arrow::Status 
LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
   // Reclaim memory from payloadCache.
   if (payloadCache_ && payloadCache_->canSpill()) {
     auto beforeSpill = payloadPool_->bytes_allocated();
+
     ARROW_ASSIGN_OR_RAISE(auto spillFile, 
createTempShuffleFile(nextSpilledFileDir()));
-    ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile));
+
     spills_.emplace_back();
     ARROW_ASSIGN_OR_RAISE(
         spills_.back(),
-        payloadCache_->spillAndClose(os, spillFile, payloadPool_.get(), 
codec_.get(), totalBytesToEvict_));
+        payloadCache_->spill(
+            spillFile, payloadPool_.get(), codec_.get(), 
options_.shuffleFileBufferSize, totalBytesToEvict_));
+
     reclaimed += beforeSpill - payloadPool_->bytes_allocated();
+
     if (reclaimed >= size) {
       *actual = reclaimed;
       return arrow::Status::OK();
     }
   }
+
   // Then spill payloads from merger. Create uncompressed payloads.
   if (merger_) {
-    auto beforeSpill = payloadPool_->bytes_allocated();
     for (auto pid = 0; pid < numPartitions_; ++pid) {
-      ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid, 
totalBytesToEvict_));
-      if (merged.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(auto maybeMerged, merger_->finish(pid, true));
+
+      if (maybeMerged.has_value()) {
+        const auto& merged = maybeMerged.value();
+
+        totalBytesToEvict_ += merged->rawSize();
+        reclaimed += merged->rawCapacity();
+
         RETURN_NOT_OK(requestSpill(false));
-        RETURN_NOT_OK(spiller_->spill(pid, std::move(*merged)));
+
+        bool shouldCompress = codec_ != nullptr && merged->numRows() >= 
options_.compressionThreshold;
+        ARROW_ASSIGN_OR_RAISE(
+            auto payload,
+            merged->toBlockPayload(
+                shouldCompress ? Payload::kToBeCompressed : 
Payload::kUncompressed, payloadPool_.get(), codec_.get()));
+
+        RETURN_NOT_OK(spiller_->spill(pid, std::move(payload)));
       }
     }
-    // This is not accurate. When the evicted partition buffers are not 
copied, the merged ones
-    // are resized from the original buffers thus allocated from 
partitionBufferPool.
-    reclaimed += beforeSpill - payloadPool_->bytes_allocated();
     RETURN_NOT_OK(finishSpill());
   }
+
   *actual = reclaimed;
   return arrow::Status::OK();
 }
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h 
b/cpp/core/shuffle/LocalPartitionWriter.h
index 02cc16c565..594a58f89b 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -30,7 +30,7 @@ class LocalPartitionWriter : public PartitionWriter {
   explicit LocalPartitionWriter(
       uint32_t numPartitions,
       PartitionWriterOptions options,
-      arrow::MemoryPool* pool,
+      MemoryManager* memoryManager,
       const std::string& dataFile,
       const std::vector<std::string>& localDirs);
 
@@ -90,8 +90,6 @@ class LocalPartitionWriter : public PartitionWriter {
 
   std::string nextSpilledFileDir();
 
-  arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const 
std::string& file);
-
   arrow::Result<int64_t> mergeSpills(uint32_t partitionId);
 
   arrow::Status clearResource();
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 55d82cafd9..898bb5b9b8 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -34,11 +34,8 @@ static constexpr int32_t kDefaultNumSubDirs = 64;
 static constexpr int32_t kDefaultCompressionThreshold = 100;
 static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024;
 static constexpr int32_t kDefaultDiskWriteBufferSize = 1024 * 1024;
-static const std::string kDefaultCompressionTypeStr = "lz4";
-static constexpr int32_t kDefaultBufferAlignment = 64;
 static constexpr double kDefaultBufferReallocThreshold = 0.25;
 static constexpr double kDefaultMergeBufferThreshold = 0.25;
-static constexpr bool kEnableBufferedWrite = true;
 static constexpr bool kDefaultUseRadixSort = true;
 static constexpr int32_t kDefaultSortBufferSize = 4096;
 static constexpr int64_t kDefaultReadBufferSize = 1 << 20;
@@ -93,8 +90,6 @@ struct PartitionWriterOptions {
   int32_t compressionLevel = arrow::util::kUseDefaultCompressionLevel;
   CompressionMode compressionMode = CompressionMode::BUFFER;
 
-  bool bufferedWrite = kEnableBufferedWrite;
-
   int32_t numSubDirs = kDefaultNumSubDirs;
 
   int64_t pushBufferMaxSize = kDefaultPushMemoryThreshold;
diff --git a/cpp/core/shuffle/PartitionWriter.h 
b/cpp/core/shuffle/PartitionWriter.h
index 4d86b7fdad..ece16c95a9 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "ShuffleMemoryPool.h"
+#include "memory/MemoryManager.h"
 #include "memory/Reclaimable.h"
 #include "shuffle/Options.h"
 #include "shuffle/Payload.h"
@@ -31,9 +32,9 @@ struct Evict {
 
 class PartitionWriter : public Reclaimable {
  public:
-  PartitionWriter(uint32_t numPartitions, PartitionWriterOptions options, 
arrow::MemoryPool* pool)
-      : numPartitions_(numPartitions), options_(std::move(options)), 
pool_(pool) {
-    payloadPool_ = std::make_unique<ShuffleMemoryPool>(pool);
+  PartitionWriter(uint32_t numPartitions, PartitionWriterOptions options, 
MemoryManager* memoryManager)
+      : numPartitions_(numPartitions), options_(std::move(options)), 
memoryManager_(memoryManager) {
+    payloadPool_ = 
std::make_unique<ShuffleMemoryPool>(memoryManager->getArrowMemoryPool());
     codec_ = createArrowIpcCodec(options_.compressionType, 
options_.codecBackend, options_.compressionLevel);
   }
 
@@ -61,13 +62,6 @@ class PartitionWriter : public Reclaimable {
   virtual arrow::Status
   sortEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
inMemoryPayload, bool isFinal) = 0;
 
-  std::optional<int64_t> getCompressedBufferLength(const 
std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
-    if (!codec_) {
-      return std::nullopt;
-    }
-    return BlockPayload::maxCompressedLength(buffers, codec_.get());
-  }
-
   virtual arrow::Status evict(uint32_t partitionId, 
std::unique_ptr<BlockPayload> blockPayload, bool stop) = 0;
 
   uint64_t cachedPayloadSize() {
@@ -81,7 +75,7 @@ class PartitionWriter : public Reclaimable {
  protected:
   uint32_t numPartitions_;
   PartitionWriterOptions options_;
-  arrow::MemoryPool* pool_;
+  MemoryManager* memoryManager_;
 
   // Memory Pool used to track memory allocation of partition payloads.
   // The actual allocation is delegated to options_.memoryPool.
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index 602b49b39e..352f46ffb6 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -188,25 +188,18 @@ arrow::Result<std::unique_ptr<BlockPayload>> 
BlockPayload::fromBuffers(
     std::vector<std::shared_ptr<arrow::Buffer>> buffers,
     const std::vector<bool>* isValidityBuffer,
     arrow::MemoryPool* pool,
-    arrow::util::Codec* codec,
-    std::shared_ptr<arrow::Buffer> compressed) {
+    arrow::util::Codec* codec) {
+  const uint32_t numBuffers = buffers.size();
+
   if (payloadType == Payload::Type::kCompressed) {
     Timer compressionTime;
     compressionTime.start();
     // Compress.
     auto maxLength = maxCompressedLength(buffers, codec);
     std::shared_ptr<arrow::Buffer> compressedBuffer;
-    uint8_t* output;
-    if (compressed) {
-      ARROW_RETURN_IF(
-          compressed->size() < maxLength,
-          arrow::Status::Invalid(
-              "Compressed buffer length < maxCompressedLength. (", 
compressed->size(), " vs ", maxLength, ")"));
-      output = const_cast<uint8_t*>(compressed->data());
-    } else {
-      ARROW_ASSIGN_OR_RAISE(compressedBuffer, 
arrow::AllocateResizableBuffer(maxLength, pool));
-      output = compressedBuffer->mutable_data();
-    }
+
+    ARROW_ASSIGN_OR_RAISE(compressedBuffer, 
arrow::AllocateResizableBuffer(maxLength, pool));
+    auto* output = compressedBuffer->mutable_data();
 
     int64_t actualLength = 0;
     // Compress buffers one by one.
@@ -219,19 +212,18 @@ arrow::Result<std::unique_ptr<BlockPayload>> 
BlockPayload::fromBuffers(
     }
 
     ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing 
compressed buffer out of bound."));
-    if (compressed) {
-      compressedBuffer = std::make_shared<arrow::Buffer>(compressed->data(), 
actualLength);
-    } else {
-      
RETURN_NOT_OK(std::dynamic_pointer_cast<arrow::ResizableBuffer>(compressedBuffer)->Resize(actualLength));
-    }
+
+    
RETURN_NOT_OK(std::dynamic_pointer_cast<arrow::ResizableBuffer>(compressedBuffer)->Resize(actualLength));
+
     compressionTime.stop();
     auto payload = std::unique_ptr<BlockPayload>(
-        new BlockPayload(Type::kCompressed, numRows, {compressedBuffer}, 
isValidityBuffer, pool, codec));
+        new BlockPayload(Type::kCompressed, numRows, numBuffers, 
{compressedBuffer}, isValidityBuffer));
     payload->setCompressionTime(compressionTime.realTimeUsed());
+
     return payload;
   }
   return std::unique_ptr<BlockPayload>(
-      new BlockPayload(payloadType, numRows, std::move(buffers), 
isValidityBuffer, pool, codec));
+      new BlockPayload(payloadType, numRows, numBuffers, std::move(buffers), 
isValidityBuffer));
 }
 
 arrow::Status BlockPayload::serialize(arrow::io::OutputStream* outputStream) {
@@ -240,8 +232,7 @@ arrow::Status 
BlockPayload::serialize(arrow::io::OutputStream* outputStream) {
       ScopedTimer timer(&writeTime_);
       RETURN_NOT_OK(outputStream->Write(&kUncompressedType, sizeof(Type)));
       RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
-      uint32_t numBuffers = buffers_.size();
-      RETURN_NOT_OK(outputStream->Write(&numBuffers, sizeof(uint32_t)));
+      RETURN_NOT_OK(outputStream->Write(&numBuffers_, sizeof(uint32_t)));
       for (auto& buffer : buffers_) {
         if (!buffer) {
           RETURN_NOT_OK(outputStream->Write(&kNullBuffer, sizeof(int64_t)));
@@ -255,23 +246,29 @@ arrow::Status 
BlockPayload::serialize(arrow::io::OutputStream* outputStream) {
       }
     } break;
     case Type::kToBeCompressed: {
-      {
-        ScopedTimer timer(&writeTime_);
-        RETURN_NOT_OK(outputStream->Write(&kCompressedType, sizeof(Type)));
-        RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
-        uint32_t numBuffers = buffers_.size();
-        RETURN_NOT_OK(outputStream->Write(&numBuffers, sizeof(uint32_t)));
-      }
+      ScopedTimer timer(&writeTime_);
+
+      // No type and rows metadata for kToBeCompressed payload.
+      RETURN_NOT_OK(outputStream->Write(&numBuffers_, sizeof(uint32_t)));
+
       for (auto& buffer : buffers_) {
-        RETURN_NOT_OK(compressAndFlush(std::move(buffer), outputStream, 
codec_, pool_, compressTime_, writeTime_));
+        if (!buffer) {
+          RETURN_NOT_OK(outputStream->Write(&kNullBuffer, sizeof(int64_t)));
+          continue;
+        }
+
+        int64_t bufferSize = buffer->size();
+        RETURN_NOT_OK(outputStream->Write(&bufferSize, sizeof(int64_t)));
+        if (bufferSize > 0) {
+          RETURN_NOT_OK(outputStream->Write(std::move(buffer)));
+        }
       }
     } break;
     case Type::kCompressed: {
       ScopedTimer timer(&writeTime_);
       RETURN_NOT_OK(outputStream->Write(&kCompressedType, sizeof(Type)));
       RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
-      uint32_t buffers = numBuffers();
-      RETURN_NOT_OK(outputStream->Write(&buffers, sizeof(uint32_t)));
+      RETURN_NOT_OK(outputStream->Write(&numBuffers_, sizeof(uint32_t)));
       RETURN_NOT_OK(outputStream->Write(std::move(buffers_[0])));
     } break;
     case Type::kRaw: {
@@ -423,16 +420,12 @@ arrow::Result<std::unique_ptr<InMemoryPayload>> 
InMemoryPayload::merge(
       }
     }
   }
-  return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer, 
std::move(merged));
+  return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer, 
source->schema(), std::move(merged));
 }
 
-arrow::Result<std::unique_ptr<BlockPayload>> InMemoryPayload::toBlockPayload(
-    Payload::Type payloadType,
-    arrow::MemoryPool* pool,
-    arrow::util::Codec* codec,
-    std::shared_ptr<arrow::Buffer> compressed) {
-  return BlockPayload::fromBuffers(
-      payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, 
codec, std::move(compressed));
+arrow::Result<std::unique_ptr<BlockPayload>>
+InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* 
pool, arrow::util::Codec* codec) {
+  return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), 
isValidityBuffer_, pool, codec);
 }
 
 arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* 
outputStream) {
@@ -472,10 +465,22 @@ int64_t InMemoryPayload::rawSize() {
   return getBufferSize(buffers_);
 }
 
+uint32_t InMemoryPayload::numBuffers() const {
+  return buffers_.size();
+}
+
+int64_t InMemoryPayload::rawCapacity() const {
+  return getBufferCapacity(buffers_);
+}
+
 bool InMemoryPayload::mergeable() const {
   return !hasComplexType_;
 }
 
+std::shared_ptr<arrow::Schema> InMemoryPayload::schema() const {
+  return schema_;
+}
+
 UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
     Type type,
     uint32_t numRows,
@@ -494,45 +499,47 @@ arrow::Status 
UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* o
   ARROW_RETURN_IF(
       inputStream_ == nullptr, arrow::Status::Invalid("inputStream_ is 
uninitialized before calling serialize()."));
 
-  if (codec_ == nullptr || type_ == Payload::kUncompressed) {
+  if (type_ == Payload::kUncompressed) {
     ARROW_ASSIGN_OR_RAISE(auto block, inputStream_->Read(rawSize_));
     RETURN_NOT_OK(outputStream->Write(block));
     return arrow::Status::OK();
   }
 
-  ARROW_RETURN_IF(
-      type_ != Payload::kToBeCompressed,
-      arrow::Status::Invalid(
-          "Invalid payload type: " + std::to_string(type_) +
-          ", should be either Payload::kUncompressed or 
Payload::kToBeCompressed"));
+  GLUTEN_DCHECK(
+      type_ == Payload::kToBeCompressed,
+      "Invalid payload type: " + std::to_string(type_) +
+          ", should be either Payload::kUncompressed or 
Payload::kToBeCompressed");
+
+  GLUTEN_CHECK(codec_ != nullptr, "Codec is null when serializing 
Payload::kToBeCompressed.");
+
   RETURN_NOT_OK(outputStream->Write(&kCompressedType, 
sizeof(kCompressedType)));
   RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t)));
 
-  ARROW_ASSIGN_OR_RAISE(auto startPos, inputStream_->Tell());
-
-  // Discard original type and rows.
-  Payload::Type type;
-  uint32_t numRows;
-  ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream_->Read(sizeof(Payload::Type), 
&type));
-  ARROW_ASSIGN_OR_RAISE(bytes, inputStream_->Read(sizeof(uint32_t), &numRows));
   uint32_t numBuffers = 0;
-  ARROW_ASSIGN_OR_RAISE(bytes, inputStream_->Read(sizeof(uint32_t), 
&numBuffers));
+  ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream_->Read(sizeof(uint32_t), 
&numBuffers));
   ARROW_RETURN_IF(bytes == 0 || numBuffers == 0, 
arrow::Status::Invalid("Cannot serialize payload with 0 buffers."));
+
   RETURN_NOT_OK(outputStream->Write(&numBuffers, sizeof(uint32_t)));
 
-  // Advance Payload::Type, rows and numBuffers.
-  auto readPos = startPos + sizeof(Payload::Type) + sizeof(uint32_t) + 
sizeof(uint32_t);
-  while (readPos - startPos < rawSize_) {
+  ARROW_ASSIGN_OR_RAISE(auto start, inputStream_->Tell());
+
+  auto pos = start;
+  auto rawBufferSize = rawSize_ - sizeof(numBuffers);
+
+  while (pos - start < rawBufferSize) {
     ARROW_ASSIGN_OR_RAISE(auto uncompressed, readUncompressedBuffer());
-    ARROW_ASSIGN_OR_RAISE(readPos, inputStream_->Tell());
+    ARROW_ASSIGN_OR_RAISE(pos, inputStream_->Tell());
     RETURN_NOT_OK(compressAndFlush(std::move(uncompressed), outputStream, 
codec_, pool_, compressTime_, writeTime_));
   }
+
+  GLUTEN_CHECK(pos - start == rawBufferSize, "Not all data is read from input 
stream.");
+
   return arrow::Status::OK();
 }
 
 arrow::Result<std::shared_ptr<arrow::Buffer>> 
UncompressedDiskBlockPayload::readUncompressedBuffer() {
   ScopedTimer timer(&writeTime_);
-  readPos_++;
+
   int64_t bufferLength;
   RETURN_NOT_OK(inputStream_->Read(sizeof(int64_t), &bufferLength));
   if (bufferLength == kNullBuffer) {
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index efc70b3fbe..2f481c5197 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -54,10 +54,6 @@ class Payload {
     return numRows_;
   }
 
-  uint32_t numBuffers() {
-    return isValidityBuffer_ ? isValidityBuffer_->size() : 1;
-  }
-
   const std::vector<bool>* isValidityBuffer() const {
     return isValidityBuffer_;
   }
@@ -82,8 +78,7 @@ class BlockPayload final : public Payload {
       std::vector<std::shared_ptr<arrow::Buffer>> buffers,
       const std::vector<bool>* isValidityBuffer,
       arrow::MemoryPool* pool,
-      arrow::util::Codec* codec,
-      std::shared_ptr<arrow::Buffer> compressed);
+      arrow::util::Codec* codec);
 
   static arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> 
deserialize(
       arrow::io::InputStream* inputStream,
@@ -103,21 +98,19 @@ class BlockPayload final : public Payload {
 
   int64_t rawSize() override;
 
- protected:
+ private:
   BlockPayload(
       Type type,
       uint32_t numRows,
+      uint32_t numBuffers,
       std::vector<std::shared_ptr<arrow::Buffer>> buffers,
-      const std::vector<bool>* isValidityBuffer,
-      arrow::MemoryPool* pool,
-      arrow::util::Codec* codec)
-      : Payload(type, numRows, isValidityBuffer), 
buffers_(std::move(buffers)), pool_(pool), codec_(codec) {}
+      const std::vector<bool>* isValidityBuffer)
+      : Payload(type, numRows, isValidityBuffer), numBuffers_(numBuffers), 
buffers_(std::move(buffers)) {}
 
   void setCompressionTime(int64_t compressionTime);
 
+  uint32_t numBuffers_;
   std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
-  arrow::MemoryPool* pool_;
-  arrow::util::Codec* codec_;
 };
 
 class InMemoryPayload final : public Payload {
@@ -125,9 +118,11 @@ class InMemoryPayload final : public Payload {
   InMemoryPayload(
       uint32_t numRows,
       const std::vector<bool>* isValidityBuffer,
+      const std::shared_ptr<arrow::Schema>& schema,
       std::vector<std::shared_ptr<arrow::Buffer>> buffers,
       bool hasComplexType = false)
       : Payload(Type::kUncompressed, numRows, isValidityBuffer),
+        schema_(schema),
         buffers_(std::move(buffers)),
         hasComplexType_(hasComplexType) {}
 
@@ -138,19 +133,23 @@ class InMemoryPayload final : public Payload {
 
   arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index);
 
-  arrow::Result<std::unique_ptr<BlockPayload>> toBlockPayload(
-      Payload::Type payloadType,
-      arrow::MemoryPool* pool,
-      arrow::util::Codec* codec,
-      std::shared_ptr<arrow::Buffer> compressed = nullptr);
+  arrow::Result<std::unique_ptr<BlockPayload>>
+  toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, 
arrow::util::Codec* codec);
 
   arrow::Status copyBuffers(arrow::MemoryPool* pool);
 
   int64_t rawSize() override;
 
+  uint32_t numBuffers() const;
+
+  int64_t rawCapacity() const;
+
   bool mergeable() const;
 
+  std::shared_ptr<arrow::Schema> schema() const;
+
  private:
+  std::shared_ptr<arrow::Schema> schema_;
   std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
   bool hasComplexType_;
 };
@@ -175,7 +174,6 @@ class UncompressedDiskBlockPayload final : public Payload {
   int64_t rawSize_;
   arrow::MemoryPool* pool_;
   arrow::util::Codec* codec_;
-  uint32_t readPos_{0};
 
   arrow::Result<std::shared_ptr<arrow::Buffer>> readUncompressedBuffer();
 };
diff --git a/cpp/core/shuffle/Spill.cc b/cpp/core/shuffle/Spill.cc
index 1a88a0b11f..92434db3ec 100644
--- a/cpp/core/shuffle/Spill.cc
+++ b/cpp/core/shuffle/Spill.cc
@@ -48,7 +48,6 @@ void Spill::insertPayload(
     int64_t rawSize,
     arrow::MemoryPool* pool,
     arrow::util::Codec* codec) {
-  // TODO: Add compression threshold.
   switch (payloadType) {
     case Payload::Type::kUncompressed:
     case Payload::Type::kToBeCompressed:
diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc
index 457702c0c9..4e4dcb5d7e 100644
--- a/cpp/core/shuffle/Utils.cc
+++ b/cpp/core/shuffle/Utils.cc
@@ -420,6 +420,13 @@ int64_t gluten::getBufferSize(const 
std::vector<std::shared_ptr<arrow::Buffer>>&
       });
 }
 
+int64_t gluten::getBufferCapacity(const 
std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
+  return std::accumulate(
+      std::cbegin(buffers), std::cend(buffers), 0LL, [](int64_t sum, const 
std::shared_ptr<arrow::Buffer>& buf) {
+        return buf == nullptr ? sum : sum + buf->capacity();
+      });
+}
+
 int64_t gluten::getMaxCompressedBufferSize(
     const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
     arrow::util::Codec* codec) {
diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h
index fafb1fbfd5..b08e319c50 100644
--- a/cpp/core/shuffle/Utils.h
+++ b/cpp/core/shuffle/Utils.h
@@ -32,13 +32,28 @@
 
 namespace gluten {
 
-using BinaryArrayLengthBufferType = uint32_t;
+using StringLengthType = uint32_t;
 using IpcOffsetBufferType = arrow::LargeStringType::offset_type;
 
-static const size_t kSizeOfBinaryArrayLengthBuffer = 
sizeof(BinaryArrayLengthBufferType);
+static const size_t kSizeOfStringLength = sizeof(StringLengthType);
 static const size_t kSizeOfIpcOffsetBuffer = sizeof(IpcOffsetBufferType);
 static const std::string kGlutenSparkLocalDirs = "GLUTEN_SPARK_LOCAL_DIRS";
 
+class PartitionScopeGuard {
+ public:
+  PartitionScopeGuard(std::optional<uint32_t>& partitionInUse, uint32_t 
partitionId) : partitionInUse_(partitionInUse) {
+    GLUTEN_DCHECK(!partitionInUse_.has_value(), "Partition id is already 
set.");
+    partitionInUse_ = partitionId;
+  }
+
+  ~PartitionScopeGuard() {
+    partitionInUse_ = std::nullopt;
+  }
+
+ private:
+  std::optional<uint32_t>& partitionInUse_;
+};
+
 std::string getShuffleSpillDir(const std::string& configuredDir, int32_t 
subDirId);
 
 arrow::Result<std::string> createTempShuffleFile(const std::string& dir);
@@ -50,6 +65,8 @@ int64_t getBufferSize(const std::shared_ptr<arrow::Array>& 
array);
 
 int64_t getBufferSize(const std::vector<std::shared_ptr<arrow::Buffer>>& 
buffers);
 
+int64_t getBufferCapacity(const std::vector<std::shared_ptr<arrow::Buffer>>& 
buffers);
+
 int64_t getMaxCompressedBufferSize(
     const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
     arrow::util::Codec* codec);
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc 
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 1161d06fc5..3e8880e3b8 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -115,8 +115,7 @@ arrow::Status RssPartitionWriter::doEvict(uint32_t 
partitionId, std::unique_ptr<
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
   auto payloadType = codec_ ? Payload::Type::kCompressed : 
Payload::Type::kUncompressed;
   ARROW_ASSIGN_OR_RAISE(
-      auto payload,
-      inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ 
? codec_.get() : nullptr, nullptr));
+      auto payload, inMemoryPayload->toBlockPayload(payloadType, 
payloadPool_.get(), codec_ ? codec_.get() : nullptr));
   // Copy payload to arrow buffered os.
   ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, 
arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize));
   RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h 
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index 040c7546b9..6ad896425d 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -33,9 +33,9 @@ class RssPartitionWriter final : public PartitionWriter {
   RssPartitionWriter(
       uint32_t numPartitions,
       PartitionWriterOptions options,
-      arrow::MemoryPool* pool,
+      MemoryManager* memoryManager,
       std::shared_ptr<RssClient> rssClient)
-      : PartitionWriter(numPartitions, std::move(options), pool), 
rssClient_(rssClient) {
+      : PartitionWriter(numPartitions, std::move(options), memoryManager), 
rssClient_(rssClient) {
     init();
   }
 
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index a54a6292d4..b0ec44802f 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -61,7 +61,7 @@ DEFINE_bool(rss, false, "Mocking rss.");
 DEFINE_string(
     compression,
     "lz4",
-    "Specify the compression codec. Valid options are lz4, zstd, qat_gzip, 
qat_zstd, iaa_gzip");
+    "Specify the compression codec. Valid options are none, lz4, zstd, 
qat_gzip, qat_zstd, iaa_gzip");
 DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) 
partitions");
 
 DEFINE_string(plan, "", "Path to input json file of the substrait plan.");
@@ -165,15 +165,13 @@ void cleanupLocalDirs(const std::vector<std::string>& 
localDirs) {
 
 PartitionWriterOptions createPartitionWriterOptions() {
   PartitionWriterOptions partitionWriterOptions{};
-  // Disable writer's merge.
-  partitionWriterOptions.mergeThreshold = 0;
 
   // Configure compression.
-  if (FLAGS_compression == "lz4") {
-    partitionWriterOptions.codecBackend = CodecBackend::NONE;
+  if (FLAGS_compression == "none") {
+    partitionWriterOptions.compressionType = arrow::Compression::UNCOMPRESSED;
+  } else if (FLAGS_compression == "lz4") {
     partitionWriterOptions.compressionType = arrow::Compression::LZ4_FRAME;
   } else if (FLAGS_compression == "zstd") {
-    partitionWriterOptions.codecBackend = CodecBackend::NONE;
     partitionWriterOptions.compressionType = arrow::Compression::ZSTD;
   } else if (FLAGS_compression == "qat_gzip") {
     partitionWriterOptions.codecBackend = CodecBackend::QAT;
@@ -197,17 +195,10 @@ std::unique_ptr<PartitionWriter> createPartitionWriter(
   if (FLAGS_rss) {
     auto rssClient = std::make_unique<LocalRssClient>(dataFile);
     partitionWriter = std::make_unique<RssPartitionWriter>(
-        FLAGS_shuffle_partitions,
-        std::move(options),
-        runtime->memoryManager()->getArrowMemoryPool(),
-        std::move(rssClient));
+        FLAGS_shuffle_partitions, std::move(options), 
runtime->memoryManager(), std::move(rssClient));
   } else {
     partitionWriter = std::make_unique<LocalPartitionWriter>(
-        FLAGS_shuffle_partitions,
-        std::move(options),
-        runtime->memoryManager()->getArrowMemoryPool(),
-        dataFile,
-        localDirs);
+        FLAGS_shuffle_partitions, std::move(options), 
runtime->memoryManager(), dataFile, localDirs);
   }
   return partitionWriter;
 }
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index ae2127a135..cdc0a7653c 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -15,11 +15,9 @@
  * limitations under the License.
  */
 
-#include "VeloxHashShuffleWriter.h"
+#include "shuffle/VeloxHashShuffleWriter.h"
 #include "memory/ArrowMemory.h"
 #include "memory/VeloxColumnarBatch.h"
-#include "memory/VeloxMemoryManager.h"
-#include "shuffle/ShuffleSchema.h"
 #include "shuffle/Utils.h"
 #include "utils/Common.h"
 #include "utils/Macros.h"
@@ -108,9 +106,9 @@ arrow::Status collectFlatVectorBufferStringView(
 
   auto rawValues = flatVector->rawValues();
   // last offset is the totalStringSize
-  auto lengthBufferSize = sizeof(gluten::BinaryArrayLengthBufferType) * 
flatVector->size();
+  auto lengthBufferSize = sizeof(gluten::StringLengthType) * 
flatVector->size();
   ARROW_ASSIGN_OR_RAISE(auto lengthBuffer, 
arrow::AllocateResizableBuffer(lengthBufferSize, pool));
-  auto* rawLength = 
reinterpret_cast<gluten::BinaryArrayLengthBufferType*>(lengthBuffer->mutable_data());
+  auto* rawLength = 
reinterpret_cast<gluten::StringLengthType*>(lengthBuffer->mutable_data());
   uint64_t offset = 0;
   for (int32_t i = 0; i < flatVector->size(); i++) {
     auto length = rawValues[i].size();
@@ -337,7 +335,7 @@ arrow::Status VeloxHashShuffleWriter::stop() {
   setSplitState(SplitState::kStopEvict);
   if (options_.partitioning != Partitioning::kSingle) {
     for (auto pid = 0; pid < numPartitions_; ++pid) {
-      PartitionBufferGuard guard(partitionBufferInUse_, pid);
+      PartitionScopeGuard stopEvictGuard(partitionBufferInUse_, pid);
       RETURN_NOT_OK(evictPartitionBuffers(pid, false));
     }
   }
@@ -630,7 +628,7 @@ arrow::Status VeloxHashShuffleWriter::splitBinaryType(
     auto& binaryBuf = dst[pid];
 
     // use 32bit offset
-    auto dstLengthBase = (BinaryArrayLengthBufferType*)(binaryBuf.lengthPtr) + 
partitionBufferBase_[pid];
+    auto dstLengthBase = (StringLengthType*)(binaryBuf.lengthPtr) + 
partitionBufferBase_[pid];
 
     auto valueOffset = binaryBuf.valueOffset;
     auto dstValuePtr = binaryBuf.valuePtr + valueOffset;
@@ -668,7 +666,7 @@ arrow::Status VeloxHashShuffleWriter::splitBinaryType(
         binaryBuf.valueCapacity = capacity;
         dstValuePtr = binaryBuf.valuePtr + valueOffset - stringLen;
         // Need to update dstLengthBase because lengthPtr can be updated if 
Reserve triggers spill.
-        dstLengthBase = (BinaryArrayLengthBufferType*)(binaryBuf.lengthPtr) + 
partitionBufferBase_[pid];
+        dstLengthBase = (StringLengthType*)(binaryBuf.lengthPtr) + 
partitionBufferBase_[pid];
       }
 
       // 2. copy value
@@ -824,7 +822,7 @@ void VeloxHashShuffleWriter::calculateSimpleColumnBytes() {
     // `bool(1) >> 3` gets 0, so +7
     fixedWidthBufferBytes_ += 
((arrow::bit_width(arrowColumnTypes_[colIdx]->id()) + 7) >> 3);
   }
-  fixedWidthBufferBytes_ += kSizeOfBinaryArrayLengthBuffer * 
binaryColumnIndices_.size();
+  fixedWidthBufferBytes_ += kSizeOfStringLength * binaryColumnIndices_.size();
 }
 
 uint32_t VeloxHashShuffleWriter::calculatePartitionBufferSize(const 
facebook::velox::RowVector& rv, int64_t memLimit) {
@@ -861,7 +859,10 @@ uint32_t 
VeloxHashShuffleWriter::calculatePartitionBufferSize(const facebook::ve
 
   VS_PRINTLF(bytesPerRow);
 
+  // Remove the cache memory size since it can be spilled.
+  // The logic here is to keep the split buffer as large as possible, to get 
max batch size for reducer.
   memLimit += cachedPayloadSize();
+
   // make sure split buffer uses 128M memory at least, let's hardcode it here 
for now
   if (memLimit < kMinMemLimit) {
     memLimit = kMinMemLimit;
@@ -925,7 +926,7 @@ arrow::Status 
VeloxHashShuffleWriter::allocatePartitionBuffer(uint32_t partition
         auto binaryIdx = i - fixedWidthColumnCount_;
 
         std::shared_ptr<arrow::ResizableBuffer> lengthBuffer{};
-        auto lengthBufferSize = newSize * kSizeOfBinaryArrayLengthBuffer;
+        auto lengthBufferSize = newSize * kSizeOfStringLength;
         ARROW_ASSIGN_OR_RAISE(
             lengthBuffer, arrow::AllocateResizableBuffer(lengthBufferSize, 
partitionBufferPool_.get()));
 
@@ -962,7 +963,8 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers(
     std::vector<std::shared_ptr<arrow::Buffer>> buffers,
     bool reuseBuffers) {
   if (!buffers.empty()) {
-    auto payload = std::make_unique<InMemoryPayload>(numRows, 
&isValidityBuffer_, std::move(buffers), hasComplexType_);
+    auto payload =
+        std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_, 
schema_, std::move(buffers), hasComplexType_);
     RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload), 
Evict::kCache, reuseBuffers));
   }
   return arrow::Status::OK();
@@ -1011,7 +1013,7 @@ 
arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxHashShuffleWrite
           allBuffers.push_back(nullptr);
         }
         // Length buffer.
-        auto lengthBufferSize = numRows * kSizeOfBinaryArrayLengthBuffer;
+        auto lengthBufferSize = numRows * kSizeOfStringLength;
         ARROW_RETURN_IF(
             !buffers[kBinaryLengthBufferIndex], arrow::Status::Invalid("Offset 
buffer of binary array is null."));
         if (reuseBuffers) {
@@ -1194,7 +1196,7 @@ arrow::Status 
VeloxHashShuffleWriter::resizePartitionBuffer(uint32_t partitionId
         auto& binaryBuf = partitionBinaryAddrs_[binaryIdx][partitionId];
         auto& lengthBuffer = buffers[kBinaryLengthBufferIndex];
         ARROW_RETURN_IF(!lengthBuffer, arrow::Status::Invalid("Offset buffer 
of binary array is null."));
-        RETURN_NOT_OK(lengthBuffer->Resize(newSize * 
kSizeOfBinaryArrayLengthBuffer));
+        RETURN_NOT_OK(lengthBuffer->Resize(newSize * kSizeOfStringLength));
 
         // Skip Resize value buffer if the spill is triggered by resizing this 
split binary buffer.
         // Only update length buffer ptr.
@@ -1372,8 +1374,8 @@ arrow::Result<int64_t> 
VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
     for (auto& item : pidToSize) {
       auto pid = item.first;
       ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));
-      auto payload =
-          std::make_unique<InMemoryPayload>(item.second, &isValidityBuffer_, 
std::move(buffers), hasComplexType_);
+      auto payload = std::make_unique<InMemoryPayload>(
+          item.second, &isValidityBuffer_, schema_, std::move(buffers), 
hasComplexType_);
       metrics_.totalBytesToEvict += payload->rawSize();
       RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), 
Evict::kSpill, false));
       evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
index 4ee12a1550..5f45c9065c 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
@@ -303,21 +303,6 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
 
   arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, 
int64_t memLimit);
 
-  class PartitionBufferGuard {
-   public:
-    PartitionBufferGuard(std::optional<uint32_t>& partitionInUse, uint32_t 
partitionId)
-        : partitionBufferInUse_(partitionInUse) {
-      partitionBufferInUse_ = partitionId;
-    }
-
-    ~PartitionBufferGuard() {
-      partitionBufferInUse_ = std::nullopt;
-    }
-
-   private:
-    std::optional<uint32_t>& partitionBufferInUse_;
-  };
-
   std::shared_ptr<arrow::Schema> schema_;
 
   // Column index, partition id, buffers.
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index e3075872e5..66d02aa68e 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -122,8 +122,7 @@ arrow::Status 
VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
   auto buffer = bufferOutputStream_->getBuffer();
   auto arrowBuffer = std::make_shared<arrow::Buffer>(buffer->as<uint8_t>(), 
buffer->size());
   ARROW_ASSIGN_OR_RAISE(
-      auto payload,
-      BlockPayload::fromBuffers(Payload::kRaw, 0, {std::move(arrowBuffer)}, 
nullptr, nullptr, nullptr, nullptr));
+      auto payload, BlockPayload::fromBuffers(Payload::kRaw, 0, 
{std::move(arrowBuffer)}, nullptr, nullptr, nullptr));
   RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), 
stopped_));
   batch_ = 
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), 
serde_.get());
   batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index dc0af91dc0..ae956aa49b 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -19,7 +19,6 @@
 
 #include <arrow/array/array_binary.h>
 #include <arrow/io/buffered.h>
-#include <arrow/io/compressed.h>
 #include <velox/common/caching/AsyncDataCache.h>
 
 #include "memory/VeloxColumnarBatch.h"
@@ -27,7 +26,6 @@
 #include "shuffle/Payload.h"
 #include "shuffle/Utils.h"
 #include "utils/Common.h"
-#include "utils/Compression.h"
 #include "utils/Macros.h"
 #include "utils/Timer.h"
 #include "utils/VeloxArrowUtils.h"
@@ -135,7 +133,7 @@ VectorPtr readFlatVectorStringView(
   auto nulls = buffers[bufferIdx++];
   auto lengthBuffer = buffers[bufferIdx++];
   auto valueBuffer = buffers[bufferIdx++];
-  const auto* rawLength = lengthBuffer->as<BinaryArrayLengthBufferType>();
+  const auto* rawLength = lengthBuffer->as<StringLengthType>();
 
   std::vector<BufferPtr> stringBuffers;
   auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * length, 
pool);
@@ -341,16 +339,17 @@ std::shared_ptr<ColumnarBatch> 
VeloxHashShuffleReaderDeserializer::next() {
       break;
     }
     if (!merged_) {
-      merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, 
std::move(arrowBuffers));
+      merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, 
schema_, std::move(arrowBuffers));
       arrowBuffers.clear();
       continue;
     }
+
     auto mergedRows = merged_->numRows() + numRows;
     if (mergedRows > batchSize_) {
       break;
     }
 
-    auto append = std::make_unique<InMemoryPayload>(numRows, 
isValidityBuffer_, std::move(arrowBuffers));
+    auto append = std::make_unique<InMemoryPayload>(numRows, 
isValidityBuffer_, schema_, std::move(arrowBuffers));
     GLUTEN_ASSIGN_OR_THROW(merged_, InMemoryPayload::merge(std::move(merged_), 
std::move(append), memoryPool_));
     arrowBuffers.clear();
   }
@@ -364,8 +363,9 @@ std::shared_ptr<ColumnarBatch> 
VeloxHashShuffleReaderDeserializer::next() {
 
   // Save remaining rows.
   if (!arrowBuffers.empty()) {
-    merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, 
std::move(arrowBuffers));
+    merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, 
schema_, std::move(arrowBuffers));
   }
+
   return columnarBatch;
 }
 
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index 71f31618cf..ea9e78e181 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -130,8 +130,6 @@ class VeloxRssSortShuffleReaderDeserializer : public 
ColumnarBatchIterator {
   std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
   facebook::velox::RowTypePtr rowType_;
   std::vector<facebook::velox::RowVectorPtr> batches_;
-  bool reachEos_{false};
-  int32_t rowCount_;
   int32_t batchSize_;
   facebook::velox::common::CompressionKind veloxCompressionType_;
   facebook::velox::VectorSerde* const serde_;
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 6c78752664..3bf39683a7 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -325,7 +325,10 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
 arrow::Status VeloxSortShuffleWriter::evictPartitionInternal(uint32_t 
partitionId, uint8_t* buffer, int64_t rawLength) {
   VELOX_CHECK(rawLength > 0);
   auto payload = std::make_unique<InMemoryPayload>(
-      0, nullptr, 
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
 rawLength)});
+      0,
+      nullptr,
+      nullptr,
+      
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
 rawLength)});
   updateSpillMetrics(payload);
   RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), 
stopped_));
   return arrow::Status::OK();
diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
index bdbdb604ca..156196fc2f 100644
--- a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
@@ -54,12 +54,12 @@ class VeloxHashShuffleWriterSpillTest : public 
VeloxShuffleWriterTestBase, publi
     auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
 
     auto partitionWriter = createPartitionWriter(
-        PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
+        PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_);
 
     GLUTEN_ASSIGN_OR_THROW(
         auto shuffleWriter,
         VeloxHashShuffleWriter::create(
-            numPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), veloxPool, arrowPool));
+            numPartitions, std::move(partitionWriter), shuffleWriterOptions_, 
veloxPool, arrowPool));
 
     return shuffleWriter;
   }
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 405e702cd3..53ef82a021 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -55,21 +55,36 @@ struct ShuffleTestParams {
   }
 };
 
-RowVectorPtr takeRows(const std::vector<RowVectorPtr>& sources, const 
std::vector<std::vector<int32_t>>& indices) {
-  auto copy = RowVector::createEmpty(sources[0]->type(), sources[0]->pool());
+std::vector<RowVectorPtr> takeRows(
+    const std::vector<RowVectorPtr>& sources,
+    const std::vector<std::vector<int32_t>>& indices) {
+  std::vector<RowVectorPtr> result;
+
   for (size_t i = 0; i < sources.size(); ++i) {
     if (indices[i].empty()) {
-      copy->append(sources[i].get());
+      result.push_back(sources[i]);
       continue;
     }
-    for (int32_t idx : indices[i]) {
-      if (idx >= sources[i]->size()) {
-        throw GlutenException("Index out of bound: " + std::to_string(idx));
-      }
-      copy->append(sources[i]->slice(idx, 1).get());
+
+    auto copy = RowVector::createEmpty(sources[0]->type(), sources[0]->pool());
+    for (const auto row : indices[i]) {
+      GLUTEN_CHECK(row < sources[i]->size(), fmt::format("Index out of bound: 
{}", row));
+      copy->append(sources[i]->slice(row, 1).get());
     }
+    result.push_back(std::move(copy));
   }
-  return copy;
+
+  return result;
+}
+
+RowVectorPtr mergeRowVectors(const std::vector<RowVectorPtr>& sources) {
+  RowVectorPtr result = RowVector::createEmpty(sources[0]->type(), 
sources[0]->pool());
+
+  for (const auto& source : sources) {
+    result->append(source.get());
+  }
+
+  return result;
 }
 
 std::vector<ShuffleTestParams> getTestParams() {
@@ -88,7 +103,7 @@ std::vector<ShuffleTestParams> getTestParams() {
     // Sort-based shuffle.
     for (const auto partitionWriterType : {PartitionWriterType::kLocal, 
PartitionWriterType::kRss}) {
       for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) {
-        for (const auto useRadixSort : {true, false}) {
+        for (const bool useRadixSort : {true, false}) {
           params.push_back(ShuffleTestParams{
               .shuffleWriterType = ShuffleWriterType::kSortShuffle,
               .partitionWriterType = partitionWriterType,
@@ -172,8 +187,8 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
     
ASSERT_EQ(*arrow::internal::FileExists(*arrow::internal::PlatformFilename::FromString(fileName)),
 true);
   }
 
-  std::shared_ptr<arrow::Schema> getArrowSchema(facebook::velox::RowVectorPtr& 
rowVector) {
-    return toArrowSchema(rowVector->type(), pool());
+  std::shared_ptr<arrow::Schema> getArrowSchema(const 
facebook::velox::RowVectorPtr& rowVector) const {
+    return toArrowSchema(rowVector->type(), 
getDefaultMemoryManager()->getLeafMemoryPool().get());
   }
 
   void setReadableFile(const std::string& fileName) {
@@ -185,11 +200,11 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
 
   void getRowVectors(
       arrow::Compression::type compressionType,
-      std::shared_ptr<arrow::Schema> schema,
+      const RowTypePtr& rowType,
       std::vector<facebook::velox::RowVectorPtr>& vectors,
       std::shared_ptr<arrow::io::InputStream> in) {
-    const auto rowType = 
facebook::velox::asRowType(gluten::fromArrowSchema(schema));
     const auto veloxCompressionType = 
arrowCompressionTypeToVelox(compressionType);
+    const auto schema = toArrowSchema(rowType, 
getDefaultMemoryManager()->getLeafMemoryPool().get());
 
     auto codec = createArrowIpcCodec(compressionType, CodecBackend::NONE);
 
@@ -215,78 +230,19 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
     }
   }
 
-  inline static TestAllocationListener* listener_{nullptr};
-
-  std::shared_ptr<arrow::io::ReadableFile> file_;
-};
-
-class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
- protected:
-  void testShuffleWrite(VeloxShuffleWriter& shuffleWriter, 
std::vector<facebook::velox::RowVectorPtr> inputs) {
-    for (auto& vector : inputs) {
-      ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
-    }
-    ASSERT_NOT_OK(shuffleWriter.stop());
-
-    // Verify data file exists.
-    checkFileExists(dataFile_);
-
-    // Verify number of output partitions.
-    const auto& lengths = shuffleWriter.partitionLengths();
-    ASSERT_EQ(lengths.size(), 1);
-
-    const auto schema = getArrowSchema(inputs[0]);
-
-    std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
-    setReadableFile(dataFile_);
-    GLUTEN_ASSIGN_OR_THROW(auto in, 
arrow::io::RandomAccessFile::GetStream(file_, 0, lengths[0]));
-    getRowVectors(partitionWriterOptions_.compressionType, schema, 
deserializedVectors, in);
-
-    ASSERT_EQ(deserializedVectors.size(), inputs.size());
-    for (int32_t i = 0; i < deserializedVectors.size(); i++) {
-      facebook::velox::test::assertEqualVectors(inputs[i], 
deserializedVectors[i]);
-    }
-  }
-
-  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t) override {
-    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
-    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
-    shuffleWriterOptions_.partitioning = Partitioning::kSingle;
-    shuffleWriterOptions_.bufferSize = 10;
-
-    auto partitionWriter = createPartitionWriter(
-        GetParam().partitionWriterType, 1, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-
-    GLUTEN_ASSIGN_OR_THROW(
-        auto shuffleWriter,
-        VeloxShuffleWriter::create(
-            GetParam().shuffleWriterType,
-            1,
-            std::move(partitionWriter),
-            std::move(shuffleWriterOptions_),
-            veloxPool,
-            arrowPool));
-
-    return shuffleWriter;
-  }
-};
-
-class MultiplePartitioningShuffleWriter : public VeloxShuffleWriterTest {
- protected:
   void shuffleWriteReadMultiBlocks(
       VeloxShuffleWriter& shuffleWriter,
       int32_t expectPartitionLength,
-      facebook::velox::TypePtr dataType,
-      std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{ /* blockId = pid, rowVector in block */
+      const std::vector<std::vector<facebook::velox::RowVectorPtr>>& 
expectedVectors) {
     ASSERT_NOT_OK(shuffleWriter.stop());
-    // verify data file
+
     checkFileExists(dataFile_);
-    // verify output temporary files
+
     const auto& lengths = shuffleWriter.partitionLengths();
     ASSERT_EQ(lengths.size(), expectPartitionLength);
+
     int64_t lengthSum = std::accumulate(lengths.begin(), lengths.end(), 0);
-    auto schema = toArrowSchema(dataType, pool());
+
     setReadableFile(dataFile_);
     ASSERT_EQ(*file_->GetSize(), lengthSum);
     for (int32_t i = 0; i < expectPartitionLength; i++) {
@@ -296,32 +252,69 @@ class MultiplePartitioningShuffleWriter : public 
VeloxShuffleWriterTest {
         std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
         GLUTEN_ASSIGN_OR_THROW(
             auto in, arrow::io::RandomAccessFile::GetStream(file_, i == 0 ? 0 
: lengths[i - 1], lengths[i]));
-        getRowVectors(partitionWriterOptions_.compressionType, schema, 
deserializedVectors, in);
-        ASSERT_EQ(expectedVectors[i].size(), deserializedVectors.size());
-        for (int32_t j = 0; j < expectedVectors[i].size(); j++) {
-          facebook::velox::test::assertEqualVectors(expectedVectors[i][j], 
deserializedVectors[j]);
-        }
+        getRowVectors(
+            partitionWriterOptions_.compressionType, 
asRowType(expectedVectors[i][0]->type()), deserializedVectors, in);
+
+        const auto expectedVector = mergeRowVectors(expectedVectors[i]);
+        const auto deserializedVector = mergeRowVectors(deserializedVectors);
+        facebook::velox::test::assertEqualVectors(expectedVector, 
deserializedVector);
       }
     }
   }
 
-  void testShuffleWriteMultiBlocks(
+  void testShuffleRoundTrip(
+      VeloxShuffleWriter& shuffleWriter,
+      std::vector<std::shared_ptr<ColumnarBatch>> batches,
+      int32_t expectPartitionLength,
+      std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{
+    for (const auto& batch : batches) {
+      ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
+    }
+    shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength, 
expectedVectors);
+  }
+
+  void testShuffleRoundTrip(
       VeloxShuffleWriter& shuffleWriter,
-      std::vector<facebook::velox::RowVectorPtr> vectors,
+      std::vector<RowVectorPtr> inputs,
       int32_t expectPartitionLength,
-      facebook::velox::TypePtr dataType,
       std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{
-    for (auto& vector : vectors) {
-      ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
+    std::vector<std::shared_ptr<ColumnarBatch>> batches;
+    for (const auto& input : inputs) {
+      batches.emplace_back(std::make_shared<VeloxColumnarBatch>(input));
     }
-    shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength, 
dataType, expectedVectors);
+    testShuffleRoundTrip(shuffleWriter, batches, expectPartitionLength, 
expectedVectors);
   }
+
+  inline static TestAllocationListener* listener_{nullptr};
+
+  std::shared_ptr<arrow::io::ReadableFile> file_;
 };
 
-class HashPartitioningShuffleWriter : public MultiplePartitioningShuffleWriter 
{
+class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
+ protected:
+  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t) override {
+    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+    shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+    shuffleWriterOptions_.bufferSize = 10;
+
+    auto partitionWriter =
+        createPartitionWriter(GetParam().partitionWriterType, 1, dataFile_, 
localDirs_, partitionWriterOptions_);
+
+    GLUTEN_ASSIGN_OR_THROW(
+        auto shuffleWriter,
+        VeloxShuffleWriter::create(
+            GetParam().shuffleWriterType, 1, std::move(partitionWriter), 
shuffleWriterOptions_, veloxPool, arrowPool));
+
+    return shuffleWriter;
+  }
+};
+
+class HashPartitioningShuffleWriter : public VeloxShuffleWriterTest {
  protected:
   void SetUp() override {
-    MultiplePartitioningShuffleWriter::SetUp();
+    VeloxShuffleWriterTest::SetUp();
 
     children1_.insert((children1_.begin()), makeFlatVector<int32_t>({1, 2, 2, 
2, 2, 1, 1, 1, 2, 1}));
     hashInputVector1_ = makeRowVector(children1_);
@@ -337,7 +330,7 @@ class HashPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
     shuffleWriterOptions_.bufferSize = 4;
 
     auto partitionWriter = createPartitionWriter(
-        GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
+        GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_);
 
     GLUTEN_ASSIGN_OR_THROW(
         auto shuffleWriter,
@@ -345,7 +338,7 @@ class HashPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
             GetParam().shuffleWriterType,
             numPartitions,
             std::move(partitionWriter),
-            std::move(shuffleWriterOptions_),
+            shuffleWriterOptions_,
             veloxPool,
             arrowPool));
 
@@ -358,10 +351,10 @@ class HashPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
   facebook::velox::RowVectorPtr hashInputVector2_;
 };
 
-class RangePartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
+class RangePartitioningShuffleWriter : public VeloxShuffleWriterTest {
  protected:
   void SetUp() override {
-    MultiplePartitioningShuffleWriter::SetUp();
+    VeloxShuffleWriterTest::SetUp();
 
     auto pid1 = makeRowVector({makeFlatVector<int32_t>({0, 1, 0, 1, 0, 1, 0, 
1, 0, 1})});
     auto rangeVector1 = makeRowVector(inputVector1_->children());
@@ -382,7 +375,7 @@ class RangePartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter
     shuffleWriterOptions_.bufferSize = 4;
 
     auto partitionWriter = createPartitionWriter(
-        GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
+        GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_);
 
     GLUTEN_ASSIGN_OR_THROW(
         auto shuffleWriter,
@@ -390,40 +383,28 @@ class RangePartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter
             GetParam().shuffleWriterType,
             numPartitions,
             std::move(partitionWriter),
-            std::move(shuffleWriterOptions_),
+            shuffleWriterOptions_,
             veloxPool,
             arrowPool));
 
     return shuffleWriter;
   }
 
-  void testShuffleWriteMultiBlocks(
-      VeloxShuffleWriter& shuffleWriter,
-      std::vector<std::shared_ptr<ColumnarBatch>> batches,
-      int32_t expectPartitionLength,
-      facebook::velox::TypePtr dataType,
-      std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{ /* blockId = pid, rowVector in block */
-    for (auto& batch : batches) {
-      ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
-    }
-    shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength, 
dataType, expectedVectors);
-  }
-
   std::shared_ptr<ColumnarBatch> compositeBatch1_;
   std::shared_ptr<ColumnarBatch> compositeBatch2_;
 };
 
-class RoundRobinPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
+class RoundRobinPartitioningShuffleWriter : public VeloxShuffleWriterTest {
  protected:
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) override {
     auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
     auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
 
     shuffleWriterOptions_.partitioning = Partitioning::kRoundRobin;
-    shuffleWriterOptions_.bufferSize = 4;
+    shuffleWriterOptions_.bufferSize = 4096;
 
     auto partitionWriter = createPartitionWriter(
-        GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
+        GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_);
 
     GLUTEN_ASSIGN_OR_THROW(
         auto shuffleWriter,
@@ -431,7 +412,7 @@ class RoundRobinPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWr
             GetParam().shuffleWriterType,
             numPartitions,
             std::move(partitionWriter),
-            std::move(shuffleWriterOptions_),
+            shuffleWriterOptions_,
             veloxPool,
             arrowPool));
 
@@ -443,32 +424,34 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
   if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
     return;
   }
+
+  ASSERT_NOT_OK(initShuffleWriterOptions());
+
   // Split 1 RowVector.
   {
-    ASSERT_NOT_OK(initShuffleWriterOptions());
     auto shuffleWriter = createShuffleWriter(1);
-    testShuffleWrite(*shuffleWriter, {inputVector1_});
+    testShuffleRoundTrip(*shuffleWriter, {inputVector1_}, 1, 
{{inputVector1_}});
   }
   // Split > 1 RowVector.
   {
-    ASSERT_NOT_OK(initShuffleWriterOptions());
     auto shuffleWriter = createShuffleWriter(1);
-    auto resultBlock = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{}, {}, {}});
-    testShuffleWrite(*shuffleWriter, {resultBlock});
+    testShuffleRoundTrip(
+        *shuffleWriter,
+        {inputVector1_, inputVector2_, inputVector1_},
+        1,
+        {{inputVector1_, inputVector2_, inputVector1_}});
   }
   // Split null RowVector.
   {
-    ASSERT_NOT_OK(initShuffleWriterOptions());
     auto shuffleWriter = createShuffleWriter(1);
     auto vector = makeRowVector({
         makeNullableFlatVector<int32_t>({std::nullopt}),
         makeNullableFlatVector<StringView>({std::nullopt}),
     });
-    testShuffleWrite(*shuffleWriter, {vector});
+    testShuffleRoundTrip(*shuffleWriter, {vector}, 1, {{vector}});
   }
   // Other types.
   {
-    ASSERT_NOT_OK(initShuffleWriterOptions());
     auto shuffleWriter = createShuffleWriter(1);
     auto vector = makeRowVector({
         makeNullableFlatVector<int32_t>({std::nullopt, 1}),
@@ -489,7 +472,7 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
         }),
         makeMapVector<int32_t, StringView>({{{1, "str1000"}, {2, "str2000"}}, 
{{3, "str3000"}, {4, "str4000"}}}),
     });
-    testShuffleWrite(*shuffleWriter, {vector});
+    testShuffleRoundTrip(*shuffleWriter, {vector}, 1, {{vector}});
   }
 }
 
@@ -540,7 +523,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
       makeNullableFlatVector<Timestamp>({std::nullopt, Timestamp(0, 0)}),
   });
 
-  testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, dataType, 
{{firstBlock}, {secondBlock}});
+  testShuffleRoundTrip(*shuffleWriter, {vector}, 2, {{firstBlock}, 
{secondBlock}});
 }
 
 TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
@@ -552,7 +535,7 @@ TEST_P(HashPartitioningShuffleWriter, 
hashPart1VectorComplexType) {
   auto firstBlock = takeRows({inputVectorComplex_}, {{1}});
   auto secondBlock = takeRows({inputVectorComplex_}, {{0}});
 
-  testShuffleWriteMultiBlocks(*shuffleWriter, {vector}, 2, 
inputVectorComplex_->type(), {{firstBlock}, {secondBlock}});
+  testShuffleRoundTrip(*shuffleWriter, {vector}, 2, {firstBlock, secondBlock});
 }
 
 TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
@@ -560,32 +543,28 @@ TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
   auto shuffleWriter = createShuffleWriter(2);
 
   auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{1, 2, 3, 4, 8}, {0, 1}, {1, 2, 3, 4, 8}});
-  auto blockPid1 = takeRows({inputVector1_}, {{0, 5, 6, 7, 9, 0, 5, 6, 7, 9}});
-
-  testShuffleWriteMultiBlocks(
-      *shuffleWriter,
-      {hashInputVector1_, hashInputVector2_, hashInputVector1_},
-      2,
-      inputVector1_->type(),
-      {{blockPid2}, {blockPid1}});
+  auto blockPid1 = takeRows({inputVector1_, inputVector1_}, {{0, 5, 6, 7, 9}, 
{0, 5, 6, 7, 9}});
+
+  testShuffleRoundTrip(
+      *shuffleWriter, {hashInputVector1_, hashInputVector2_, 
hashInputVector1_}, 2, {blockPid2, blockPid1});
 }
 
 TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
   const int32_t expectedMaxBatchSize = 8;
   ASSERT_NOT_OK(initShuffleWriterOptions());
   auto shuffleWriter = createShuffleWriter(2);
+
   // calculate maxBatchSize_
   ASSERT_NOT_OK(splitRowVector(*shuffleWriter, hashInputVector1_));
   if (GetParam().shuffleWriterType == ShuffleWriterType::kHashShuffle) {
     VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize);
   }
 
-  auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{1, 2, 3, 4, 8}, {0, 1}, {1, 2, 3, 4, 8}});
-  auto blockPid1 = takeRows({inputVector1_}, {{0, 5, 6, 7, 9, 0, 5, 6, 7, 9}});
+  auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{1, 2, 3, 4, 8}, {}, {1, 2, 3, 4, 8}});
+  auto blockPid1 = takeRows({inputVector1_, inputVector1_}, {{0, 5, 6, 7, 9}, 
{0, 5, 6, 7, 9}});
 
   VELOX_CHECK(hashInputVector1_->size() > expectedMaxBatchSize);
-  testShuffleWriteMultiBlocks(
-      *shuffleWriter, {hashInputVector2_, hashInputVector1_}, 2, 
inputVector1_->type(), {{blockPid2}, {blockPid1}});
+  testShuffleRoundTrip(*shuffleWriter, {hashInputVector2_, hashInputVector1_}, 
2, {blockPid2, blockPid1});
 }
 
 TEST_P(RangePartitioningShuffleWriter, range) {
@@ -595,12 +574,8 @@ TEST_P(RangePartitioningShuffleWriter, range) {
   auto blockPid1 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{0, 2, 4, 6, 8}, {0}, {0, 2, 4, 6, 8}});
   auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{1, 3, 5, 7, 9}, {1}, {1, 3, 5, 7, 9}});
 
-  testShuffleWriteMultiBlocks(
-      *shuffleWriter,
-      {compositeBatch1_, compositeBatch2_, compositeBatch1_},
-      2,
-      inputVector1_->type(),
-      {{blockPid1}, {blockPid2}});
+  testShuffleRoundTrip(
+      *shuffleWriter, {compositeBatch1_, compositeBatch2_, compositeBatch1_}, 
2, {blockPid1, blockPid2});
 }
 
 TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
@@ -610,12 +585,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
   auto blockPid1 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{0, 2, 4, 6, 8}, {0}, {0, 2, 4, 6, 8}});
   auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{1, 3, 5, 7, 9}, {1}, {1, 3, 5, 7, 9}});
 
-  testShuffleWriteMultiBlocks(
-      *shuffleWriter,
-      {inputVector1_, inputVector2_, inputVector1_},
-      2,
-      inputVector1_->type(),
-      {{blockPid1}, {blockPid2}});
+  testShuffleRoundTrip(*shuffleWriter, {inputVector1_, inputVector2_, 
inputVector1_}, 2, {blockPid1, blockPid2});
 }
 
 TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) {
@@ -715,6 +685,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, 
spillVerifyResult) {
   if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
     return;
   }
+
   ASSERT_NOT_OK(initShuffleWriterOptions());
   auto shuffleWriter = createShuffleWriter(2);
 
@@ -741,11 +712,13 @@ TEST_P(RoundRobinPartitioningShuffleWriter, 
spillVerifyResult) {
 
   ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
 
-  auto blockPid1 = takeRows({inputVector1_}, {{0, 2, 4, 6, 8, 0, 2, 4, 6, 8, 
0, 2, 4, 6, 8}});
-  auto blockPid2 = takeRows({inputVector1_}, {{1, 3, 5, 7, 9, 1, 3, 5, 7, 9, 
1, 3, 5, 7, 9}});
+  auto blockPid1 =
+      takeRows({inputVector1_, inputVector1_, inputVector1_}, {{0, 2, 4, 6, 
8}, {0, 2, 4, 6, 8}, {0, 2, 4, 6, 8}});
+  auto blockPid2 =
+      takeRows({inputVector1_, inputVector1_, inputVector1_}, {{1, 3, 5, 7, 
9}, {1, 3, 5, 7, 9}, {1, 3, 5, 7, 9}});
 
   // Stop and verify.
-  shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), 
{{blockPid1}, {blockPid2}});
+  shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2});
 }
 
 TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
@@ -760,7 +733,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
 
   auto blockPid1 = takeRows({inputVector1_}, {{0, 2, 4, 6, 8}});
   auto blockPid2 = takeRows({inputVector1_}, {{1, 3, 5, 7, 9}});
-  shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), 
{{blockPid1}, {blockPid2}});
+  shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2});
 }
 
 INSTANTIATE_TEST_SUITE_P(
diff --git a/cpp/velox/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
index 913bd8e487..278da23c27 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
@@ -57,13 +57,13 @@ std::unique_ptr<PartitionWriter> createPartitionWriter(
     uint32_t numPartitions,
     const std::string& dataFile,
     const std::vector<std::string>& localDirs,
-    const PartitionWriterOptions& options,
-    arrow::MemoryPool* pool) {
+    const PartitionWriterOptions& options) {
   if (partitionWriterType == PartitionWriterType::kRss) {
     auto rssClient = std::make_unique<LocalRssClient>(dataFile);
-    return std::make_unique<RssPartitionWriter>(numPartitions, options, pool, 
std::move(rssClient));
+    return std::make_unique<RssPartitionWriter>(
+        numPartitions, options, getDefaultMemoryManager(), 
std::move(rssClient));
   }
-  return std::make_unique<LocalPartitionWriter>(numPartitions, options, pool, 
dataFile, localDirs);
+  return std::make_unique<LocalPartitionWriter>(numPartitions, options, 
getDefaultMemoryManager(), dataFile, localDirs);
 }
 } // namespace
 
@@ -75,7 +75,7 @@ class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase
     auto listener = std::make_unique<TestAllocationListener>();
     listener_ = listener.get();
 
-    std::unordered_map<std::string, std::string> 
conf{{kMemoryReservationBlockSize, "1"}};
+    std::unordered_map<std::string, std::string> 
conf{{kMemoryReservationBlockSize, "1"}, {kDebugModeEnabled, "true"}};
 
     VeloxBackend::create(std::move(listener), conf);
   }
@@ -120,7 +120,16 @@ class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase
         makeFlatVector<facebook::velox::StringView>(
             {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6", 
"boB7", "ALICE8", "BOB9"}),
         makeNullableFlatVector<facebook::velox::StringView>(
-            {"alice", "bob", std::nullopt, std::nullopt, "Alice", "Bob", 
std::nullopt, "alicE", std::nullopt, "boB"}),
+            {"alice_0",
+             "bob_1",
+             std::nullopt,
+             std::nullopt,
+             "Alice_4",
+             "Bob_5",
+             std::nullopt,
+             "alicE_7",
+             std::nullopt,
+             "boB_9"}),
         facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 10, 
pool())};
 
     children2_ = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to