This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a3a0dd60ca [GLUTEN-9457][VL] Shuffle test code cleanup (#9458)
a3a0dd60ca is described below

commit a3a0dd60ca63fc4ca1dff2a2977f60c86654ad5d
Author: Rong Ma <[email protected]>
AuthorDate: Wed Apr 30 11:48:52 2025 +0100

    [GLUTEN-9457][VL] Shuffle test code cleanup (#9458)
---
 cpp/core/jni/JniCommon.h                           |  14 -
 cpp/core/jni/JniWrapper.cc                         |   2 -
 cpp/core/shuffle/Options.h                         |  26 +-
 cpp/core/shuffle/PartitionWriter.h                 |  13 +-
 cpp/core/shuffle/ShuffleWriter.cc                  |  22 +-
 cpp/core/shuffle/ShuffleWriter.h                   |  13 +-
 cpp/core/shuffle/rss/RssClient.h                   |   2 +
 cpp/core/utils/Macros.h                            |   8 +
 cpp/velox/CMakeLists.txt                           |   6 +-
 cpp/velox/benchmarks/ColumnarToRowBenchmark.cc     |   3 +-
 cpp/velox/benchmarks/GenericBenchmark.cc           |  27 +-
 cpp/velox/benchmarks/common/BenchmarkUtils.cc      |  42 +-
 cpp/velox/benchmarks/common/BenchmarkUtils.h       |  30 +-
 cpp/velox/compute/VeloxRuntime.cc                  |  14 +-
 cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc     |   3 +-
 cpp/velox/shuffle/VeloxShuffleReader.cc            |   5 +-
 cpp/velox/shuffle/VeloxShuffleReader.h             |   2 +-
 cpp/velox/shuffle/VeloxShuffleWriter.cc            |   2 +-
 cpp/velox/tests/CMakeLists.txt                     |   4 +
 cpp/velox/tests/VeloxShuffleWriterSpillTest.cc     | 375 ++++++++++
 cpp/velox/tests/VeloxShuffleWriterTest.cc          | 762 +++++++++++----------
 cpp/velox/tests/VeloxShuffleWriterTestBase.h       | 257 +++++++
 cpp/velox/utils/LocalRssClient.cc                  |  62 ++
 .../RssClient.h => velox/utils/LocalRssClient.h}   |  25 +-
 cpp/velox/utils/TestAllocationListener.cc          |  72 ++
 cpp/velox/utils/TestAllocationListener.h           |  66 ++
 cpp/velox/utils/VeloxArrowUtils.cc                 |  19 +
 cpp/velox/utils/VeloxArrowUtils.h                  |   9 +-
 cpp/velox/utils/tests/LocalRssClient.h             |  75 --
 cpp/velox/utils/tests/MemoryPoolUtils.cc           | 156 -----
 cpp/velox/utils/tests/MemoryPoolUtils.h            |  96 ---
 cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h | 592 ----------------
 32 files changed, 1389 insertions(+), 1415 deletions(-)

diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index 0436c986a1..38839b31ae 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -323,20 +323,6 @@ static inline arrow::Compression::type 
getCompressionType(JNIEnv* env, jstring c
   return compressionType;
 }
 
-static inline const std::string getCompressionTypeStr(JNIEnv* env, jstring 
codecJstr) {
-  if (codecJstr == NULL) {
-    return "none";
-  }
-  auto codec = env->GetStringUTFChars(codecJstr, JNI_FALSE);
-
-  // Convert codec string into lowercase.
-  std::string codecLower;
-  std::transform(codec, codec + std::strlen(codec), 
std::back_inserter(codecLower), ::tolower);
-
-  env->ReleaseStringUTFChars(codecJstr, codec);
-  return codecLower;
-}
-
 static inline gluten::CodecBackend getCodecBackend(JNIEnv* env, jstring 
codecJstr) {
   if (codecJstr == nullptr) {
     return gluten::CodecBackend::NONE;
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index ecc816a79b..f794516f02 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -837,7 +837,6 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
       .compressionBufferSize = compressionBufferSize,
       .compressionThreshold = compressionThreshold,
       .compressionType = getCompressionType(env, codecJstr),
-      .compressionTypeStr = getCompressionTypeStr(env, codecJstr),
       .compressionLevel = compressionLevel,
       .bufferedWrite = true,
       .numSubDirs = numSubDirs,
@@ -1042,7 +1041,6 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
 
   ShuffleReaderOptions options = ShuffleReaderOptions{};
   options.compressionType = getCompressionType(env, compressionType);
-  options.compressionTypeStr = getCompressionTypeStr(env, compressionType);
   if (compressionType != nullptr) {
     options.codecBackend = getCodecBackend(env, compressionBackend);
   }
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index cb6fec7ae8..55d82cafd9 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -17,10 +17,12 @@
 
 #pragma once
 
-#include <arrow/ipc/options.h>
-#include <arrow/util/compression.h>
 #include "shuffle/Partitioning.h"
 #include "utils/Compression.h"
+#include "utils/Macros.h"
+
+#include <arrow/ipc/options.h>
+#include <arrow/util/compression.h>
 
 namespace gluten {
 
@@ -43,17 +45,24 @@ static constexpr int64_t kDefaultReadBufferSize = 1 << 20;
 static constexpr int64_t kDefaultDeserializerBufferSize = 1 << 20;
 static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;
 
-enum ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
-enum PartitionWriterType { kLocal, kRss };
-enum SortAlgorithm { kRadixSort, kQuickSort };
+enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
+
+enum class PartitionWriterType { kLocal, kRss };
 
 struct ShuffleReaderOptions {
+  ShuffleWriterType shuffleWriterType = ShuffleWriterType::kHashShuffle;
+
+  // Compression options.
   arrow::Compression::type compressionType = 
arrow::Compression::type::LZ4_FRAME;
-  std::string compressionTypeStr = "lz4";
-  ShuffleWriterType shuffleWriterType = kHashShuffle;
   CodecBackend codecBackend = CodecBackend::NONE;
+
+  // Output batch size.
   int32_t batchSize = kDefaultBatchSize;
+
+  // Buffer size when reading data from the input stream.
   int64_t readerBufferSize = kDefaultReadBufferSize;
+
+  // Buffer size when deserializing rows into columnar batches. Only used for 
sort-based shuffle.
   int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;
 };
 
@@ -65,7 +74,7 @@ struct ShuffleWriterOptions {
   int64_t taskAttemptId = -1;
   int32_t startPartitionId = 0;
   int64_t threadId = -1;
-  ShuffleWriterType shuffleWriterType = kHashShuffle;
+  ShuffleWriterType shuffleWriterType = ShuffleWriterType::kHashShuffle;
 
   // Sort shuffle writer.
   int32_t initialSortBufferSize = kDefaultSortBufferSize; // 
spark.shuffle.sort.initialBufferSize
@@ -80,7 +89,6 @@ struct PartitionWriterOptions {
       kDefaultCompressionBufferSize; // 
spark.io.compression.lz4.blockSize,spark.io.compression.zstd.bufferSize
   int32_t compressionThreshold = kDefaultCompressionThreshold;
   arrow::Compression::type compressionType = arrow::Compression::LZ4_FRAME;
-  std::string compressionTypeStr = kDefaultCompressionTypeStr;
   CodecBackend codecBackend = CodecBackend::NONE;
   int32_t compressionLevel = arrow::util::kUseDefaultCompressionLevel;
   CompressionMode compressionMode = CompressionMode::BUFFER;
diff --git a/cpp/core/shuffle/PartitionWriter.h 
b/cpp/core/shuffle/PartitionWriter.h
index 171efed0a3..4d86b7fdad 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -37,7 +37,17 @@ class PartitionWriter : public Reclaimable {
     codec_ = createArrowIpcCodec(options_.compressionType, 
options_.codecBackend, options_.compressionLevel);
   }
 
-  virtual ~PartitionWriter() = default;
+  static inline std::string typeToString(PartitionWriterType type) {
+    switch (type) {
+      case PartitionWriterType::kLocal:
+        return "LocalPartitionWriter";
+      case PartitionWriterType::kRss:
+        return "RssPartitionWriter";
+    }
+    GLUTEN_UNREACHABLE();
+  }
+
+  ~PartitionWriter() override = default;
 
   virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0;
 
@@ -83,4 +93,5 @@ class PartitionWriter : public Reclaimable {
   int64_t spillTime_{0};
   int64_t writeTime_{0};
 };
+
 } // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleWriter.cc 
b/cpp/core/shuffle/ShuffleWriter.cc
index eff5523fe3..13d7c30fec 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -26,17 +26,29 @@ const std::string kSortShuffleName = "sort";
 const std::string kRssSortShuffleName = "rss_sort";
 } // namespace
 
-ShuffleWriterType ShuffleWriter::stringToType(const std::string& type) {
-  if (type == kHashShuffleName) {
+ShuffleWriterType ShuffleWriter::stringToType(const std::string& typeString) {
+  if (typeString == kHashShuffleName) {
     return ShuffleWriterType::kHashShuffle;
   }
-  if (type == kSortShuffleName) {
+  if (typeString == kSortShuffleName) {
     return ShuffleWriterType::kSortShuffle;
   }
-  if (type == kRssSortShuffleName) {
+  if (typeString == kRssSortShuffleName) {
     return ShuffleWriterType::kRssSortShuffle;
   }
-  throw GlutenException("Unrecognized shuffle writer type: " + type);
+  throw GlutenException("Unrecognized shuffle writer type: " + typeString);
+}
+
+std::string ShuffleWriter::typeToString(ShuffleWriterType type) {
+  switch (type) {
+    case ShuffleWriterType::kHashShuffle:
+      return kHashShuffleName;
+    case ShuffleWriterType::kSortShuffle:
+      return kSortShuffleName;
+    case ShuffleWriterType::kRssSortShuffle:
+      return kRssSortShuffleName;
+  }
+  GLUTEN_UNREACHABLE();
 }
 
 int32_t ShuffleWriter::numPartitions() const {
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 8c79829e00..8852f0527b 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -17,18 +17,11 @@
 
 #pragma once
 
-#include <numeric>
-#include <utility>
-
-#include "memory/ArrowMemoryPool.h"
 #include "memory/ColumnarBatch.h"
 #include "memory/Reclaimable.h"
 #include "shuffle/Options.h"
 #include "shuffle/PartitionWriter.h"
 #include "shuffle/Partitioner.h"
-#include "shuffle/Partitioning.h"
-#include "shuffle/ShuffleMemoryPool.h"
-#include "utils/Compression.h"
 
 namespace gluten {
 
@@ -38,7 +31,9 @@ class ShuffleWriter : public Reclaimable {
 
   static constexpr int64_t kMaxMemLimit = 1LL * 1024 * 1024 * 1024;
 
-  static ShuffleWriterType stringToType(const std::string& type);
+  static ShuffleWriterType stringToType(const std::string& typeString);
+
+  static std::string typeToString(ShuffleWriterType type);
 
   virtual arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t 
memLimit) = 0;
 
@@ -73,7 +68,7 @@ class ShuffleWriter : public Reclaimable {
  protected:
   ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions options, 
arrow::MemoryPool* pool);
 
-  virtual ~ShuffleWriter() = default;
+  ~ShuffleWriter() override = default;
 
   int32_t numPartitions_;
 
diff --git a/cpp/core/shuffle/rss/RssClient.h b/cpp/core/shuffle/rss/RssClient.h
index dddccfa1ad..ba7380ae8d 100644
--- a/cpp/core/shuffle/rss/RssClient.h
+++ b/cpp/core/shuffle/rss/RssClient.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <cstdint>
+
 class RssClient {
  public:
   virtual ~RssClient() = default;
diff --git a/cpp/core/utils/Macros.h b/cpp/core/utils/Macros.h
index 8c6807c40b..74128cca35 100644
--- a/cpp/core/utils/Macros.h
+++ b/cpp/core/utils/Macros.h
@@ -106,3 +106,11 @@
   (time > 1e7 ? time / 1e6 : ((time > 1e4) ? time / 1e3 : time)) << (time > 
1e7 ? "ms" : (time > 1e4 ? "us" : "ns"))
 
 #define ROUND_TO_LINE(n, round) (((n) + (round)-1) & ~((round)-1))
+
+#if defined(__GNUC__) || __has_builtin(__builtin_unreachable)
+#define GLUTEN_UNREACHABLE() __builtin_unreachable()
+#elif defined(_MSC_VER)
+#define GLUTEN_UNREACHABLE() __assume(false)
+#else
+#define GLUTEN_UNREACHABLE() ((void)0)
+#endif
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 7f902e8078..71e30922d2 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -166,6 +166,8 @@ set(VELOX_SRCS
     udf/UdfLoader.cc
     utils/Common.cc
     utils/ConfigExtractor.cc
+    utils/LocalRssClient.cc
+    utils/TestAllocationListener.cc
     utils/VeloxArrowUtils.cc
     utils/VeloxBatchResizer.cc)
 
@@ -173,10 +175,6 @@ if(ENABLE_S3)
   find_package(ZLIB)
 endif()
 
-if(BUILD_TESTS OR BUILD_BENCHMARKS)
-  list(APPEND VELOX_SRCS utils/tests/MemoryPoolUtils.cc)
-endif()
-
 add_library(velox SHARED ${VELOX_SRCS})
 
 if(ENABLE_GLUTEN_VCPKG AND NOT CMAKE_SYSTEM_NAME MATCHES "Darwin")
diff --git a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc 
b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
index f693bb8858..e32e6d7513 100644
--- a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
+++ b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
@@ -261,8 +261,7 @@ int main(int argc, char** argv) {
   LOG(INFO) << "datafile = " << datafile;
   LOG(INFO) << "cpu = " << cpu;
 
-  auto backendConf = gluten::defaultConf();
-  gluten::initVeloxBackend(backendConf);
+  gluten::initVeloxBackend();
   memory::MemoryManager::testingSetInstance({});
 
   gluten::GoogleBenchmarkColumnarToRowCacheScanBenchmark bck(datafile);
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index b73840fbaf..e9c85987c6 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -20,9 +20,10 @@
 
 #include <arrow/c/bridge.h>
 #include <arrow/util/range.h>
+
 #include <benchmark/benchmark.h>
+
 #include <gflags/gflags.h>
-#include <operators/writer/ArrowWriter.h>
 
 #include "benchmarks/common/BenchmarkUtils.h"
 #include "compute/VeloxBackend.h"
@@ -35,10 +36,11 @@
 #include "shuffle/VeloxShuffleWriter.h"
 #include "shuffle/rss/RssPartitionWriter.h"
 #include "utils/Exception.h"
+#include "utils/LocalRssClient.h"
 #include "utils/StringUtil.h"
+#include "utils/TestAllocationListener.h"
 #include "utils/Timer.h"
 #include "utils/VeloxArrowUtils.h"
-#include "utils/tests/LocalRssClient.h"
 #include "velox/exec/PlanNodeStats.h"
 
 using namespace gluten;
@@ -170,11 +172,9 @@ PartitionWriterOptions createPartitionWriterOptions() {
   if (FLAGS_compression == "lz4") {
     partitionWriterOptions.codecBackend = CodecBackend::NONE;
     partitionWriterOptions.compressionType = arrow::Compression::LZ4_FRAME;
-    partitionWriterOptions.compressionTypeStr = "lz4";
   } else if (FLAGS_compression == "zstd") {
     partitionWriterOptions.codecBackend = CodecBackend::NONE;
     partitionWriterOptions.compressionType = arrow::Compression::ZSTD;
-    partitionWriterOptions.compressionTypeStr = "zstd";
   } else if (FLAGS_compression == "qat_gzip") {
     partitionWriterOptions.codecBackend = CodecBackend::QAT;
     partitionWriterOptions.compressionType = arrow::Compression::GZIP;
@@ -218,9 +218,9 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
   auto options = ShuffleWriterOptions{};
   options.partitioning = gluten::toPartitioning(FLAGS_partitioning);
   if (FLAGS_rss || FLAGS_shuffle_writer == "rss_sort") {
-    options.shuffleWriterType = gluten::kRssSortShuffle;
+    options.shuffleWriterType = gluten::ShuffleWriterType::kRssSortShuffle;
   } else if (FLAGS_shuffle_writer == "sort") {
-    options.shuffleWriterType = gluten::kSortShuffle;
+    options.shuffleWriterType = gluten::ShuffleWriterType::kSortShuffle;
   }
   auto shuffleWriter =
       runtime->createShuffleWriter(FLAGS_shuffle_partitions, 
std::move(partitionWriter), std::move(options));
@@ -257,7 +257,7 @@ void setCpu(::benchmark::State& state) {
 
 void runShuffle(
     Runtime* runtime,
-    BenchmarkAllocationListener* listener,
+    TestAllocationListener* listener,
     const std::shared_ptr<gluten::ResultIterator>& resultIter,
     WriterMetrics& writerMetrics,
     ReaderMetrics& readerMetrics,
@@ -292,7 +292,6 @@ void runShuffle(
     readerOptions.shuffleWriterType = 
shuffleWriter->options().shuffleWriterType;
     readerOptions.compressionType = partitionWriterOptions.compressionType;
     readerOptions.codecBackend = partitionWriterOptions.codecBackend;
-    readerOptions.compressionTypeStr = 
partitionWriterOptions.compressionTypeStr;
 
     std::shared_ptr<arrow::Schema> schema =
         gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct 
ArrowSchema*>(cSchema.get())));
@@ -389,7 +388,9 @@ auto BM_Generic = [](::benchmark::State& state,
                      FileReaderType readerType) {
   setCpu(state);
 
-  auto listener = 
std::make_unique<BenchmarkAllocationListener>(FLAGS_memory_limit);
+  auto listener = std::make_unique<TestAllocationListener>();
+  listener->updateLimit(FLAGS_memory_limit);
+
   auto* listenerPtr = listener.get();
   auto* memoryManager = MemoryManager::create(kVeloxBackendKind, 
std::move(listener));
   auto runtime = runtimeFactory(memoryManager);
@@ -513,7 +514,9 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state,
                               FileReaderType readerType) {
   setCpu(state);
 
-  auto listener = 
std::make_unique<BenchmarkAllocationListener>(FLAGS_memory_limit);
+  auto listener = std::make_unique<TestAllocationListener>();
+  listener->updateLimit(FLAGS_memory_limit);
+
   auto* listenerPtr = listener.get();
   auto* memoryManager = MemoryManager::create(kVeloxBackendKind, 
std::move(listener));
   auto runtime = runtimeFactory(memoryManager);
@@ -570,8 +573,8 @@ int main(int argc, char** argv) {
   ::benchmark::Initialize(&argc, argv);
 
   // Init Velox backend.
-  auto backendConf = gluten::defaultConf();
-  auto sessionConf = gluten::defaultConf();
+  std::unordered_map<std::string, std::string> backendConf{};
+  std::unordered_map<std::string, std::string> sessionConf{};
   backendConf.insert({gluten::kDebugModeEnabled, 
std::to_string(FLAGS_debug_mode)});
   backendConf.insert({gluten::kGlogVerboseLevel, std::to_string(FLAGS_v)});
   backendConf.insert({gluten::kGlogSeverityLevel, 
std::to_string(FLAGS_minloglevel)});
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc 
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index 6790e9235f..f31cbc0db5 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -15,7 +15,8 @@
  * limitations under the License.
  */
 
-#include "BenchmarkUtils.h"
+#include "benchmarks/common/BenchmarkUtils.h"
+
 #include "compute/VeloxBackend.h"
 #include "compute/VeloxRuntime.h"
 #include "config/VeloxConfig.h"
@@ -23,30 +24,19 @@
 #include "utils/StringUtil.h"
 #include "velox/dwio/common/Options.h"
 
+#include <benchmark/benchmark.h>
+
 DEFINE_int64(batch_size, 4096, "To set 
velox::core::QueryConfig::kPreferredOutputBatchSize.");
 DEFINE_int32(cpu, -1, "Run benchmark on specific CPU");
 DEFINE_int32(threads, 1, "The number of threads to run this benchmark");
 DEFINE_int32(iterations, 1, "The number of iterations to run this benchmark");
 
 namespace gluten {
-namespace {
-std::unordered_map<std::string, std::string> bmConfMap = defaultConf();
-} // namespace
-
-std::unordered_map<std::string, std::string> defaultConf() {
-  return {
-      {gluten::kSparkBatchSize, std::to_string(FLAGS_batch_size)},
-  };
-}
 
-void initVeloxBackend(std::unordered_map<std::string, std::string>& conf) {
+void initVeloxBackend(const std::unordered_map<std::string, std::string>& 
conf) {
   gluten::VeloxBackend::create(AllocationListener::noop(), conf);
 }
 
-void initVeloxBackend() {
-  initVeloxBackend(bmConfMap);
-}
-
 std::string getPlanFromFile(const std::string& type, const std::string& 
filePath) {
   // Read json file and resume the binary data.
   std::ifstream msgJson(filePath);
@@ -137,26 +127,4 @@ void setCpu(uint32_t cpuIndex) {
 #endif
 }
 
-void BenchmarkAllocationListener::allocationChanged(int64_t diff) {
-  if (diff > 0 && usedBytes_ + diff >= limit_) {
-    LOG(INFO) << fmt::format(
-        "reach hard limit {} when need {}, current used {}.",
-        facebook::velox::succinctBytes(limit_),
-        facebook::velox::succinctBytes(diff),
-        facebook::velox::succinctBytes(usedBytes_));
-    auto neededBytes = usedBytes_ + diff - limit_;
-    int64_t spilledBytes = 0;
-    if (iterator_) {
-      spilledBytes += iterator_->spillFixedSize(neededBytes);
-    }
-    if (spilledBytes < neededBytes && shuffleWriter_) {
-      int64_t reclaimed = 0;
-      GLUTEN_THROW_NOT_OK(shuffleWriter_->reclaimFixedSize(neededBytes - 
spilledBytes, &reclaimed));
-      spilledBytes += reclaimed;
-    }
-    LOG(INFO) << fmt::format("spill finish, got {}.", 
facebook::velox::succinctBytes(spilledBytes));
-  } else {
-    usedBytes_ += diff;
-  }
-}
 } // namespace gluten
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.h 
b/cpp/velox/benchmarks/common/BenchmarkUtils.h
index de3df96f89..3d603a38fc 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.h
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include <arrow/c/abi.h>
 #include <glog/logging.h>
 #include <cstdlib>
 #include <filesystem>
@@ -25,16 +24,12 @@
 #include <thread>
 #include <utility>
 
-#include "benchmark/benchmark.h"
 #include "substrait/SubstraitToVeloxPlan.h"
 
 #include "compute/ProtobufUtils.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "memory/VeloxMemoryManager.h"
-#include "shuffle/Options.h"
-#include "shuffle/ShuffleWriter.h"
 #include "utils/Exception.h"
-#include "utils/VeloxArrowUtils.h"
 #include "velox/common/memory/Memory.h"
 
 DECLARE_int64(batch_size);
@@ -46,11 +41,8 @@ namespace gluten {
 
 std::unordered_map<std::string, std::string> defaultConf();
 
-/// Initialize the Velox backend with default value.
-void initVeloxBackend();
-
 /// Initialize the Velox backend.
-void initVeloxBackend(std::unordered_map<std::string, std::string>& conf);
+void initVeloxBackend(const std::unordered_map<std::string, std::string>& conf 
= {});
 
 // Get the location of a file generated by Java unittest.
 inline std::string getGeneratedFilePath(const std::string& fileName) {
@@ -97,24 +89,4 @@ bool endsWith(const std::string& data, const std::string& 
suffix);
 
 void setCpu(uint32_t cpuIndex);
 
-class BenchmarkAllocationListener final : public gluten::AllocationListener {
- public:
-  BenchmarkAllocationListener(uint64_t limit) : limit_(limit) {}
-
-  void setIterator(gluten::ResultIterator* iterator) {
-    iterator_ = iterator;
-  }
-
-  void setShuffleWriter(gluten::ShuffleWriter* shuffleWriter) {
-    shuffleWriter_ = shuffleWriter;
-  }
-
-  void allocationChanged(int64_t diff) override;
-
- private:
-  uint64_t usedBytes_{0L};
-  const uint64_t limit_{0L};
-  gluten::ResultIterator* iterator_{nullptr};
-  gluten::ShuffleWriter* shuffleWriter_{nullptr};
-};
 } // namespace gluten
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index a328e9b22d..e7bd7af585 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -295,23 +295,23 @@ std::shared_ptr<VeloxDataSource> 
VeloxRuntime::createDataSource(
 std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
     std::shared_ptr<arrow::Schema> schema,
     ShuffleReaderOptions options) {
-  auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
   auto codec = gluten::createArrowIpcCodec(options.compressionType, 
options.codecBackend);
-  auto ctxVeloxPool = memoryManager()->getLeafMemoryPool();
-  auto veloxCompressionType = 
facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
+  const auto veloxCompressionKind = 
arrowCompressionTypeToVelox(options.compressionType);
+  const auto rowType = 
facebook::velox::asRowType(gluten::fromArrowSchema(schema));
+
   auto deserializerFactory = 
std::make_unique<gluten::VeloxShuffleReaderDeserializerFactory>(
       schema,
       std::move(codec),
-      veloxCompressionType,
+      veloxCompressionKind,
       rowType,
       options.batchSize,
       options.readerBufferSize,
       options.deserializerBufferSize,
       memoryManager()->getArrowMemoryPool(),
-      ctxVeloxPool,
+      memoryManager()->getLeafMemoryPool(),
       options.shuffleWriterType);
-  auto reader = 
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
-  return reader;
+
+  return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
 }
 
 std::unique_ptr<ColumnarBatchSerializer> 
VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index e17ad5e2f7..e3075872e5 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -209,8 +209,7 @@ arrow::Status VeloxRssSortShuffleWriter::stop() {
 arrow::Status VeloxRssSortShuffleWriter::initFromRowVector(const 
facebook::velox::RowVector& rv) {
   if (!rowType_) {
     rowType_ = facebook::velox::asRowType(rv.type());
-    serdeOptions_ = {
-        false, 
facebook::velox::common::stringToCompressionKind(partitionWriter_->options().compressionTypeStr)};
+    serdeOptions_ = {false, 
arrowCompressionTypeToVelox(partitionWriter_->options().compressionType)};
     batch_ = 
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), 
serde_.get());
     batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
   }
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 588a1aa808..dc0af91dc0 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -561,7 +561,7 @@ size_t 
VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::remainingSize()
 VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
     const std::shared_ptr<arrow::Schema>& schema,
     const std::shared_ptr<arrow::util::Codec>& codec,
-    const facebook::velox::common::CompressionKind veloxCompressionType,
+    facebook::velox::common::CompressionKind veloxCompressionType,
     const RowTypePtr& rowType,
     int32_t batchSize,
     int64_t readerBufferSize,
@@ -615,9 +615,8 @@ std::unique_ptr<ColumnarBatchIterator> 
VeloxShuffleReaderDeserializerFactory::cr
     case ShuffleWriterType::kRssSortShuffle:
       return std::make_unique<VeloxRssSortShuffleReaderDeserializer>(
           veloxPool_, rowType_, batchSize_, veloxCompressionType_, 
deserializeTime_, std::move(in));
-    default:
-      throw gluten::GlutenException("Unsupported shuffle writer type: " + 
std::to_string(shuffleWriterType_));
   }
+  GLUTEN_UNREACHABLE();
 }
 
 arrow::MemoryPool* VeloxShuffleReaderDeserializerFactory::getPool() {
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index de486d5991..71f31618cf 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -146,7 +146,7 @@ class VeloxShuffleReaderDeserializerFactory {
   VeloxShuffleReaderDeserializerFactory(
       const std::shared_ptr<arrow::Schema>& schema,
       const std::shared_ptr<arrow::util::Codec>& codec,
-      const facebook::velox::common::CompressionKind veloxCompressionType,
+      facebook::velox::common::CompressionKind veloxCompressionType,
       const facebook::velox::RowTypePtr& rowType,
       int32_t batchSize,
       int64_t readerBufferSize,
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxShuffleWriter.cc
index 06b59cf3eb..a01615dc35 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc
@@ -41,7 +41,7 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>> 
VeloxShuffleWriter::create(
       return VeloxRssSortShuffleWriter::create(
           numPartitions, std::move(partitionWriter), std::move(options), 
veloxPool, arrowPool);
     default:
-      return arrow::Status::Invalid("Unsupported shuffle writer type: ", 
std::to_string(type));
+      return arrow::Status::Invalid("Unsupported shuffle writer type: ", 
typeToString(type));
   }
 }
 
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index ba90e45076..a6d04822b6 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -35,6 +35,10 @@ endfunction()
 set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc FilePathGenerator.cc)
 
 add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
+
+add_velox_test(velox_shuffle_writer_spill_test SOURCES
+               VeloxShuffleWriterSpillTest.cc)
+
 # TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc)
 add_velox_test(
   velox_operators_test
diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
new file mode 100644
index 0000000000..bdbdb604ca
--- /dev/null
+++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config/GlutenConfig.h"
+#include "shuffle/VeloxHashShuffleWriter.h"
+#include "tests/VeloxShuffleWriterTestBase.h"
+#include "utils/TestAllocationListener.h"
+#include "utils/TestUtils.h"
+
+using namespace facebook::velox;
+
+namespace gluten {
+
+namespace {
+void assertSpill(TestAllocationListener* listener, std::function<void()> 
block) {
+  const auto beforeSpill = listener->reclaimedBytes();
+  block();
+  const auto afterSpill = listener->reclaimedBytes();
+
+  ASSERT_GT(afterSpill, beforeSpill);
+}
+} // namespace
+
+class VeloxHashShuffleWriterSpillTest : public VeloxShuffleWriterTestBase, 
public testing::Test {
+ protected:
+  static void SetUpTestSuite() {
+    setUpVeloxBackend();
+  }
+
+  static void TearDownTestSuite() {
+    tearDownVeloxBackend();
+  }
+
+  void SetUp() override {
+    setUpTestData();
+  }
+
+  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) override {
+    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+    auto partitionWriter = createPartitionWriter(
+        PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
+
+    GLUTEN_ASSIGN_OR_THROW(
+        auto shuffleWriter,
+        VeloxHashShuffleWriter::create(
+            numPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), veloxPool, arrowPool));
+
+    return shuffleWriter;
+  }
+
+  int64_t splitRowVectorAndSpill(
+      VeloxShuffleWriter& shuffleWriter,
+      std::vector<facebook::velox::RowVectorPtr> vectors,
+      bool shrink) {
+    for (auto vector : vectors) {
+      ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
+    }
+
+    auto targetEvicted = shuffleWriter.cachedPayloadSize();
+    if (shrink) {
+      targetEvicted += shuffleWriter.partitionBufferSize();
+    }
+    int64_t evicted;
+    ASSERT_NOT_OK(shuffleWriter.reclaimFixedSize(targetEvicted, &evicted));
+
+    return evicted;
+  };
+};
+
+TEST_F(VeloxHashShuffleWriterSpillTest, memoryLeak) {
+  shuffleWriterOptions_.bufferSize = 4;
+
+  auto shuffleWriter = createShuffleWriter(2);
+
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+  ASSERT_NOT_OK(shuffleWriter->stop());
+
+  const auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+
+  ASSERT_EQ(arrowPool->bytes_allocated(), 0);
+
+  shuffleWriter.reset();
+  ASSERT_EQ(arrowPool->bytes_allocated(), 0);
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, spillFailWithOutOfMemory) {
+  shuffleWriterOptions_.bufferSize = 4;
+
+  auto shuffleWriter = createShuffleWriter(2);
+
+  listener_->updateLimit(0L);
+  listener_->setShuffleWriter(shuffleWriter.get());
+  listener_->setThrowIfOOM(true);
+
+  ASSERT_THROW([&] { auto status = splitRowVector(*shuffleWriter, 
inputVector1_); }(), GlutenException);
+
+  // Should return OOM status because there's no partition buffer to spill.
+
+  listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kInit) {
+  shuffleWriterOptions_.bufferSize = 4;
+
+  const int32_t numPartitions = 2;
+  auto shuffleWriter = createShuffleWriter(numPartitions);
+
+  // Test spill all partition buffers.
+  {
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+    auto bufferSize = shuffleWriter->partitionBufferSize();
+    auto payloadSize = shuffleWriter->cachedPayloadSize();
+    int64_t evicted;
+    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + bufferSize, 
&evicted));
+    ASSERT_EQ(evicted, payloadSize + bufferSize);
+    // No cached payload after evict.
+    ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+    // All partition buffers should be evicted.
+    ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
+  }
+
+  // Test spill minimum-size partition buffers.
+  {
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+    auto bufferSize = shuffleWriter->partitionBufferSize();
+    auto payloadSize = shuffleWriter->cachedPayloadSize();
+    int64_t evicted;
+    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
+    ASSERT_GT(evicted, payloadSize);
+    // No cached payload after evict.
+    ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+    ASSERT_LE(shuffleWriter->partitionBufferSize(), bufferSize);
+    // Not all partition buffers was evicted.
+    ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
+  }
+
+  // Test spill empty partition buffers.
+  {
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+    // Clear buffers then the size after shrink will be 0.
+    for (auto pid = 0; pid < numPartitions; ++pid) {
+      ASSERT_NOT_OK(shuffleWriter->evictPartitionBuffers(pid, true));
+    }
+
+    auto bufferSize = shuffleWriter->partitionBufferSize();
+    auto payloadSize = shuffleWriter->cachedPayloadSize();
+    int64_t evicted;
+    // Evict payload and shrink min-size buffer.
+    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
+    ASSERT_GT(evicted, payloadSize);
+    ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
+    // Evict empty partition buffers.
+    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(bufferSize, &evicted));
+    ASSERT_GT(evicted, 0);
+    ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
+    // Evict again. No reclaimable space.
+    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
+    ASSERT_EQ(evicted, 0);
+  }
+
+  ASSERT_NOT_OK(shuffleWriter->stop());
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kInitSingle) {
+  shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+  shuffleWriterOptions_.bufferSize = 4;
+
+  auto shuffleWriter = createShuffleWriter(2);
+
+  {
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+    auto payloadSize = shuffleWriter->partitionBufferSize() + 
shuffleWriter->cachedPayloadSize();
+    int64_t evicted;
+    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
+    ASSERT_GE(evicted, payloadSize);
+    // No cached payload after evict.
+    ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+  }
+
+  ASSERT_NOT_OK(shuffleWriter->stop());
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kSplit) {
+  shuffleWriterOptions_.bufferSize = 4;
+
+  auto shuffleWriter = createShuffleWriter(2);
+
+  listener_->setShuffleWriter(shuffleWriter.get());
+
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+  listener_->updateLimit(listener_->currentBytes());
+
+  assertSpill(listener_, [&]() {
+    // Trigger spill for the next split.
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+  });
+
+  ASSERT_NOT_OK(shuffleWriter->stop());
+
+  listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kSplitSingle) {
+  shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+
+  auto shuffleWriter = createShuffleWriter(1);
+
+  listener_->setShuffleWriter(shuffleWriter.get());
+
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+  // Trigger spill for the next split.
+  listener_->updateLimit(listener_->currentBytes());
+
+  assertSpill(listener_, [&]() { ASSERT_NOT_OK(splitRowVector(*shuffleWriter, 
inputVector1_)); });
+
+  ASSERT_NOT_OK(shuffleWriter->stop());
+
+  listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kStop) {
+  shuffleWriterOptions_.bufferSize = 4096;
+  // Force compression.
+  partitionWriterOptions_.compressionThreshold = 0;
+  partitionWriterOptions_.mergeThreshold = 0;
+
+  auto shuffleWriter = createShuffleWriter(2);
+
+  for (int i = 0; i < 10; ++i) {
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+  }
+  // Reclaim bytes to shrink partition buffer.
+  int64_t reclaimed = 0;
+  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
+  ASSERT_GE(reclaimed, 2000);
+
+  listener_->setShuffleWriter(shuffleWriter.get());
+  listener_->updateLimit(listener_->currentBytes());
+
+  // Trigger spill during stop.
+  assertSpill(listener_, [&] { ASSERT_NOT_OK(shuffleWriter->stop()); });
+
+  listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kStopComplex) {
+  shuffleWriterOptions_.bufferSize = 4096;
+
+  // Force compression.
+  partitionWriterOptions_.compressionThreshold = 0;
+  partitionWriterOptions_.mergeThreshold = 0;
+
+  auto shuffleWriter = createShuffleWriter(2);
+
+  for (int i = 0; i < 3; ++i) {
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
+  }
+
+  // Reclaim bytes to shrink partition buffer.
+  int64_t reclaimed = 0;
+  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
+  ASSERT_GE(reclaimed, 2000);
+
+  // Reclaim from PartitionWriter to free cached bytes.
+  auto payloadSize = shuffleWriter->cachedPayloadSize();
+  int64_t evicted;
+  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
+  ASSERT_EQ(evicted, payloadSize);
+
+  listener_->setShuffleWriter(shuffleWriter.get());
+  listener_->updateLimit(listener_->currentBytes());
+
+  // When evicting partitioning buffers in stop, spill will be triggered by 
complex types allocating extra memory.
+  assertSpill(listener_, [&] { ASSERT_NOT_OK(shuffleWriter->stop()); });
+
+  listener_->reset();
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, evictPartitionBuffers) {
+  shuffleWriterOptions_.bufferSize = 4;
+
+  auto shuffleWriter = createShuffleWriter(2);
+
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+  // First evict cached payloads.
+  int64_t evicted;
+  
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->cachedPayloadSize(),
 &evicted));
+  ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+  ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
+
+  // Evict again. Because no cached payload to evict, it will try to evict all 
partition buffers.
+  
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->partitionBufferSize(),
 &evicted));
+  ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, kUnevictableSingle) {
+  shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+
+  auto shuffleWriter = createShuffleWriter(1);
+
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
+
+  // First evict cached payloads.
+  int64_t evicted;
+  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(
+      shuffleWriter->partitionBufferSize() + 
shuffleWriter->cachedPayloadSize(), &evicted));
+  ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
+  ASSERT_GE(evicted, 0);
+
+  // Evict again. The evicted size should be 0.
+  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
+  ASSERT_EQ(evicted, 0);
+}
+
+TEST_F(VeloxHashShuffleWriterSpillTest, resizeBinaryBufferTriggerSpill) {
+  shuffleWriterOptions_.bufferReallocThreshold = 1;
+  partitionWriterOptions_.compressionType = 
arrow::Compression::type::UNCOMPRESSED;
+
+  auto shuffleWriter = createShuffleWriter(1);
+
+  // Split first input vector. Large average string length.
+  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary1_));
+
+  listener_->setShuffleWriter(shuffleWriter.get());
+  listener_->updateLimit(listener_->currentBytes());
+
+  assertSpill(listener_, [&] {
+    // Split second input vector. Large average string length.
+    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary2_));
+  });
+
+  ASSERT_NOT_OK(shuffleWriter->stop());
+
+  listener_->reset();
+}
+
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index fac503ca70..405e702cd3 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -18,40 +18,53 @@
 #include <arrow/c/bridge.h>
 #include <arrow/io/api.h>
 
-#include "shuffle/LocalPartitionWriter.h"
+#include "config/GlutenConfig.h"
 #include "shuffle/VeloxHashShuffleWriter.h"
 #include "shuffle/VeloxRssSortShuffleWriter.h"
-#include "shuffle/VeloxShuffleWriter.h"
-#include "shuffle/rss/RssPartitionWriter.h"
+#include "shuffle/VeloxSortShuffleWriter.h"
+#include "tests/VeloxShuffleWriterTestBase.h"
+#include "utils/TestAllocationListener.h"
 #include "utils/TestUtils.h"
 #include "utils/VeloxArrowUtils.h"
-#include "utils/tests/MemoryPoolUtils.h"
-#include "utils/tests/VeloxShuffleWriterTestBase.h"
+
 #include "velox/vector/tests/utils/VectorTestBase.h"
 
-using namespace facebook;
 using namespace facebook::velox;
-using namespace arrow;
-using namespace arrow::ipc;
 
 namespace gluten {
+
 namespace {
+struct ShuffleTestParams {
+  ShuffleWriterType shuffleWriterType;
+  PartitionWriterType partitionWriterType;
+  arrow::Compression::type compressionType;
+  int32_t compressionThreshold{0};
+  int32_t mergeBufferSize{0};
+  int32_t diskWriteBufferSize{0};
+  bool useRadixSort{false};
+
+  std::string toString() const {
+    std::ostringstream out;
+    out << "shuffleWriterType = " << 
ShuffleWriter::typeToString(shuffleWriterType)
+        << ", partitionWriterType = " << 
PartitionWriter::typeToString(partitionWriterType)
+        << ", compressionType = " << 
arrow::util::Codec::GetCodecAsString(compressionType)
+        << ", compressionThreshold = " << compressionThreshold << ", 
mergeBufferSize = " << mergeBufferSize
+        << ", compressionBufferSize = " << diskWriteBufferSize
+        << ", useRadixSort = " << (useRadixSort ? "true" : "false");
+    return out.str();
+  }
+};
 
-facebook::velox::RowVectorPtr takeRows(
-    const std::vector<facebook::velox::RowVectorPtr>& sources,
-    const std::vector<std::vector<int32_t>>& indices) {
-  facebook::velox::RowVectorPtr copy = 
facebook::velox::RowVector::createEmpty(sources[0]->type(), sources[0]->pool());
+RowVectorPtr takeRows(const std::vector<RowVectorPtr>& sources, const 
std::vector<std::vector<int32_t>>& indices) {
+  auto copy = RowVector::createEmpty(sources[0]->type(), sources[0]->pool());
   for (size_t i = 0; i < sources.size(); ++i) {
     if (indices[i].empty()) {
-      // Take all rows;
       copy->append(sources[i].get());
       continue;
     }
     for (int32_t idx : indices[i]) {
       if (idx >= sources[i]->size()) {
-        throw GlutenException(
-            "Index out of bound: " + std::to_string(idx) + ". Source RowVector 
" + std::to_string(i) +
-            " num rows: " + std::to_string(sources[i]->size()));
+        throw GlutenException("Index out of bound: " + std::to_string(idx));
       }
       copy->append(sources[i]->slice(idx, 1).get());
     }
@@ -59,19 +72,23 @@ facebook::velox::RowVectorPtr takeRows(
   return copy;
 }
 
-std::vector<ShuffleTestParams> createShuffleTestParams() {
-  std::vector<ShuffleTestParams> params;
+std::vector<ShuffleTestParams> getTestParams() {
+  static std::vector<ShuffleTestParams> params{};
 
-  std::vector<arrow::Compression::type> compressions = {
-      arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, 
arrow::Compression::ZSTD};
+  if (!params.empty()) {
+    return params;
+  }
 
-  std::vector<int32_t> compressionThresholds = {-1, 0, 3, 4, 10, 4096};
-  std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
+  const std::vector<arrow::Compression::type> compressions = {
+      arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, 
arrow::Compression::ZSTD};
+  const std::vector<int32_t> compressionThresholds = {-1, 0, 3, 4, 10, 4096};
+  const std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
 
   for (const auto& compression : compressions) {
+    // Sort-based shuffle.
     for (const auto partitionWriterType : {PartitionWriterType::kLocal, 
PartitionWriterType::kRss}) {
       for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) {
-        for (auto useRadixSort : {true, false}) {
+        for (const auto useRadixSort : {true, false}) {
           params.push_back(ShuffleTestParams{
               .shuffleWriterType = ShuffleWriterType::kSortShuffle,
               .partitionWriterType = partitionWriterType,
@@ -81,28 +98,346 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
         }
       }
     }
-    params.push_back(ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, 
PartitionWriterType::kRss, compression});
 
+    // Rss sort-based shuffle.
+    params.push_back(ShuffleTestParams{
+        .shuffleWriterType = ShuffleWriterType::kRssSortShuffle,
+        .partitionWriterType = PartitionWriterType::kRss,
+        .compressionType = compression});
+
+    // Hash-based shuffle.
     for (const auto compressionThreshold : compressionThresholds) {
+      // Local.
       for (const auto mergeBufferSize : mergeBufferSizes) {
         params.push_back(ShuffleTestParams{
-            ShuffleWriterType::kHashShuffle,
-            PartitionWriterType::kLocal,
-            compression,
-            compressionThreshold,
-            mergeBufferSize});
+            .shuffleWriterType = ShuffleWriterType::kHashShuffle,
+            .partitionWriterType = PartitionWriterType::kLocal,
+            .compressionType = compression,
+            .compressionThreshold = compressionThreshold,
+            .mergeBufferSize = mergeBufferSize});
       }
+
+      // Rss.
       params.push_back(ShuffleTestParams{
-          ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, 
compression, compressionThreshold});
+          .shuffleWriterType = ShuffleWriterType::kHashShuffle,
+          .partitionWriterType = PartitionWriterType::kRss,
+          .compressionType = compression,
+          .compressionThreshold = compressionThreshold});
     }
   }
 
   return params;
 }
+} // namespace
 
-static const auto kShuffleWriteTestParams = createShuffleTestParams();
+class VeloxShuffleWriterTestEnvironment : public ::testing::Environment {
+ public:
+  void SetUp() override {
+    VeloxShuffleWriterTestBase::setUpVeloxBackend();
+  }
 
-} // namespace
+  void TearDown() override {
+    VeloxShuffleWriterTestBase::tearDownVeloxBackend();
+  }
+};
+
+class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
+ public:
+  arrow::Status initShuffleWriterOptions() {
+    ShuffleTestParams params = GetParam();
+
+    shuffleWriterOptions_.useRadixSort = params.useRadixSort;
+    shuffleWriterOptions_.diskWriteBufferSize = params.diskWriteBufferSize;
+
+    partitionWriterOptions_.compressionType = params.compressionType;
+    partitionWriterOptions_.compressionThreshold = params.compressionThreshold;
+    partitionWriterOptions_.mergeBufferSize = params.mergeBufferSize;
+
+    return arrow::Status::OK();
+  }
+
+ protected:
+  void SetUp() override {
+    std::cout << "Running test with param: " << GetParam().toString() << 
std::endl;
+    VeloxShuffleWriterTestBase::setUpTestData();
+  }
+
+  void TearDown() override {
+    if (file_ != nullptr && !file_->closed()) {
+      GLUTEN_THROW_NOT_OK(file_->Close());
+    }
+  }
+
+  static void checkFileExists(const std::string& fileName) {
+    
ASSERT_EQ(*arrow::internal::FileExists(*arrow::internal::PlatformFilename::FromString(fileName)),
 true);
+  }
+
+  std::shared_ptr<arrow::Schema> getArrowSchema(facebook::velox::RowVectorPtr& 
rowVector) {
+    return toArrowSchema(rowVector->type(), pool());
+  }
+
+  void setReadableFile(const std::string& fileName) {
+    if (file_ != nullptr && !file_->closed()) {
+      GLUTEN_THROW_NOT_OK(file_->Close());
+    }
+    GLUTEN_ASSIGN_OR_THROW(file_, arrow::io::ReadableFile::Open(fileName))
+  }
+
+  void getRowVectors(
+      arrow::Compression::type compressionType,
+      std::shared_ptr<arrow::Schema> schema,
+      std::vector<facebook::velox::RowVectorPtr>& vectors,
+      std::shared_ptr<arrow::io::InputStream> in) {
+    const auto rowType = 
facebook::velox::asRowType(gluten::fromArrowSchema(schema));
+    const auto veloxCompressionType = 
arrowCompressionTypeToVelox(compressionType);
+
+    auto codec = createArrowIpcCodec(compressionType, CodecBackend::NONE);
+
+    // Set batchSize to a large value to make all batches are merged by reader.
+    auto deserializerFactory = 
std::make_unique<gluten::VeloxShuffleReaderDeserializerFactory>(
+        schema,
+        std::move(codec),
+        veloxCompressionType,
+        rowType,
+        kDefaultBatchSize,
+        kDefaultReadBufferSize,
+        kDefaultDeserializerBufferSize,
+        getDefaultMemoryManager()->getArrowMemoryPool(),
+        pool_,
+        GetParam().shuffleWriterType);
+
+    const auto reader = 
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
+
+    const auto iter = reader->readStream(in);
+    while (iter->hasNext()) {
+      auto vector = 
std::dynamic_pointer_cast<VeloxColumnarBatch>(iter->next())->getRowVector();
+      vectors.emplace_back(vector);
+    }
+  }
+
+  inline static TestAllocationListener* listener_{nullptr};
+
+  std::shared_ptr<arrow::io::ReadableFile> file_;
+};
+
+class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
+ protected:
+  void testShuffleWrite(VeloxShuffleWriter& shuffleWriter, 
std::vector<facebook::velox::RowVectorPtr> inputs) {
+    for (auto& vector : inputs) {
+      ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
+    }
+    ASSERT_NOT_OK(shuffleWriter.stop());
+
+    // Verify data file exists.
+    checkFileExists(dataFile_);
+
+    // Verify number of output partitions.
+    const auto& lengths = shuffleWriter.partitionLengths();
+    ASSERT_EQ(lengths.size(), 1);
+
+    const auto schema = getArrowSchema(inputs[0]);
+
+    std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
+    setReadableFile(dataFile_);
+    GLUTEN_ASSIGN_OR_THROW(auto in, 
arrow::io::RandomAccessFile::GetStream(file_, 0, lengths[0]));
+    getRowVectors(partitionWriterOptions_.compressionType, schema, 
deserializedVectors, in);
+
+    ASSERT_EQ(deserializedVectors.size(), inputs.size());
+    for (int32_t i = 0; i < deserializedVectors.size(); i++) {
+      facebook::velox::test::assertEqualVectors(inputs[i], 
deserializedVectors[i]);
+    }
+  }
+
+  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t) override {
+    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+    shuffleWriterOptions_.partitioning = Partitioning::kSingle;
+    shuffleWriterOptions_.bufferSize = 10;
+
+    auto partitionWriter = createPartitionWriter(
+        GetParam().partitionWriterType, 1, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
+
+    GLUTEN_ASSIGN_OR_THROW(
+        auto shuffleWriter,
+        VeloxShuffleWriter::create(
+            GetParam().shuffleWriterType,
+            1,
+            std::move(partitionWriter),
+            std::move(shuffleWriterOptions_),
+            veloxPool,
+            arrowPool));
+
+    return shuffleWriter;
+  }
+};
+
+class MultiplePartitioningShuffleWriter : public VeloxShuffleWriterTest {
+ protected:
+  void shuffleWriteReadMultiBlocks(
+      VeloxShuffleWriter& shuffleWriter,
+      int32_t expectPartitionLength,
+      facebook::velox::TypePtr dataType,
+      std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{ /* blockId = pid, rowVector in block */
+    ASSERT_NOT_OK(shuffleWriter.stop());
+    // verify data file
+    checkFileExists(dataFile_);
+    // verify output temporary files
+    const auto& lengths = shuffleWriter.partitionLengths();
+    ASSERT_EQ(lengths.size(), expectPartitionLength);
+    int64_t lengthSum = std::accumulate(lengths.begin(), lengths.end(), 0);
+    auto schema = toArrowSchema(dataType, pool());
+    setReadableFile(dataFile_);
+    ASSERT_EQ(*file_->GetSize(), lengthSum);
+    for (int32_t i = 0; i < expectPartitionLength; i++) {
+      if (expectedVectors[i].size() == 0) {
+        ASSERT_EQ(lengths[i], 0);
+      } else {
+        std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
+        GLUTEN_ASSIGN_OR_THROW(
+            auto in, arrow::io::RandomAccessFile::GetStream(file_, i == 0 ? 0 
: lengths[i - 1], lengths[i]));
+        getRowVectors(partitionWriterOptions_.compressionType, schema, 
deserializedVectors, in);
+        ASSERT_EQ(expectedVectors[i].size(), deserializedVectors.size());
+        for (int32_t j = 0; j < expectedVectors[i].size(); j++) {
+          facebook::velox::test::assertEqualVectors(expectedVectors[i][j], 
deserializedVectors[j]);
+        }
+      }
+    }
+  }
+
+  void testShuffleWriteMultiBlocks(
+      VeloxShuffleWriter& shuffleWriter,
+      std::vector<facebook::velox::RowVectorPtr> vectors,
+      int32_t expectPartitionLength,
+      facebook::velox::TypePtr dataType,
+      std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{
+    for (auto& vector : vectors) {
+      ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
+    }
+    shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength, 
dataType, expectedVectors);
+  }
+};
+
+class HashPartitioningShuffleWriter : public MultiplePartitioningShuffleWriter 
{
+ protected:
+  void SetUp() override {
+    MultiplePartitioningShuffleWriter::SetUp();
+
+    children1_.insert((children1_.begin()), makeFlatVector<int32_t>({1, 2, 2, 
2, 2, 1, 1, 1, 2, 1}));
+    hashInputVector1_ = makeRowVector(children1_);
+    children2_.insert((children2_.begin()), makeFlatVector<int32_t>({2, 2}));
+    hashInputVector2_ = makeRowVector(children2_);
+  }
+
+  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) override {
+    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+    shuffleWriterOptions_.partitioning = Partitioning::kHash;
+    shuffleWriterOptions_.bufferSize = 4;
+
+    auto partitionWriter = createPartitionWriter(
+        GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
+
+    GLUTEN_ASSIGN_OR_THROW(
+        auto shuffleWriter,
+        VeloxShuffleWriter::create(
+            GetParam().shuffleWriterType,
+            numPartitions,
+            std::move(partitionWriter),
+            std::move(shuffleWriterOptions_),
+            veloxPool,
+            arrowPool));
+
+    return shuffleWriter;
+  }
+
+  std::vector<uint32_t> hashPartitionIds_{1, 2};
+
+  facebook::velox::RowVectorPtr hashInputVector1_;
+  facebook::velox::RowVectorPtr hashInputVector2_;
+};
+
+class RangePartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
+ protected:
+  void SetUp() override {
+    MultiplePartitioningShuffleWriter::SetUp();
+
+    auto pid1 = makeRowVector({makeFlatVector<int32_t>({0, 1, 0, 1, 0, 1, 0, 
1, 0, 1})});
+    auto rangeVector1 = makeRowVector(inputVector1_->children());
+    compositeBatch1_ = VeloxColumnarBatch::compose(
+        pool(), {std::make_shared<VeloxColumnarBatch>(pid1), 
std::make_shared<VeloxColumnarBatch>(rangeVector1)});
+
+    auto pid2 = makeRowVector({makeFlatVector<int32_t>({0, 1})});
+    auto rangeVector2 = makeRowVector(inputVector2_->children());
+    compositeBatch2_ = VeloxColumnarBatch::compose(
+        pool(), {std::make_shared<VeloxColumnarBatch>(pid2), 
std::make_shared<VeloxColumnarBatch>(rangeVector2)});
+  }
+
+  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) override {
+    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+    shuffleWriterOptions_.partitioning = Partitioning::kRange;
+    shuffleWriterOptions_.bufferSize = 4;
+
+    auto partitionWriter = createPartitionWriter(
+        GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
+
+    GLUTEN_ASSIGN_OR_THROW(
+        auto shuffleWriter,
+        VeloxShuffleWriter::create(
+            GetParam().shuffleWriterType,
+            numPartitions,
+            std::move(partitionWriter),
+            std::move(shuffleWriterOptions_),
+            veloxPool,
+            arrowPool));
+
+    return shuffleWriter;
+  }
+
+  void testShuffleWriteMultiBlocks(
+      VeloxShuffleWriter& shuffleWriter,
+      std::vector<std::shared_ptr<ColumnarBatch>> batches,
+      int32_t expectPartitionLength,
+      facebook::velox::TypePtr dataType,
+      std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{ /* blockId = pid, rowVector in block */
+    for (auto& batch : batches) {
+      ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
+    }
+    shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength, 
dataType, expectedVectors);
+  }
+
+  std::shared_ptr<ColumnarBatch> compositeBatch1_;
+  std::shared_ptr<ColumnarBatch> compositeBatch2_;
+};
+
+class RoundRobinPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
+ protected:
+  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) override {
+    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
+
+    shuffleWriterOptions_.partitioning = Partitioning::kRoundRobin;
+    shuffleWriterOptions_.bufferSize = 4;
+
+    auto partitionWriter = createPartitionWriter(
+        GetParam().partitionWriterType, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
+
+    GLUTEN_ASSIGN_OR_THROW(
+        auto shuffleWriter,
+        VeloxShuffleWriter::create(
+            GetParam().shuffleWriterType,
+            numPartitions,
+            std::move(partitionWriter),
+            std::move(shuffleWriterOptions_),
+            veloxPool,
+            arrowPool));
+
+    return shuffleWriter;
+  }
+};
 
 TEST_P(SinglePartitioningShuffleWriter, single) {
   if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
@@ -111,30 +446,30 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
   // Split 1 RowVector.
   {
     ASSERT_NOT_OK(initShuffleWriterOptions());
-    auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+    auto shuffleWriter = createShuffleWriter(1);
     testShuffleWrite(*shuffleWriter, {inputVector1_});
   }
   // Split > 1 RowVector.
   {
     ASSERT_NOT_OK(initShuffleWriterOptions());
-    auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+    auto shuffleWriter = createShuffleWriter(1);
     auto resultBlock = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{}, {}, {}});
     testShuffleWrite(*shuffleWriter, {resultBlock});
   }
   // Split null RowVector.
   {
     ASSERT_NOT_OK(initShuffleWriterOptions());
-    auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+    auto shuffleWriter = createShuffleWriter(1);
     auto vector = makeRowVector({
         makeNullableFlatVector<int32_t>({std::nullopt}),
-        makeNullableFlatVector<velox::StringView>({std::nullopt}),
+        makeNullableFlatVector<StringView>({std::nullopt}),
     });
     testShuffleWrite(*shuffleWriter, {vector});
   }
   // Other types.
   {
     ASSERT_NOT_OK(initShuffleWriterOptions());
-    auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+    auto shuffleWriter = createShuffleWriter(1);
     auto vector = makeRowVector({
         makeNullableFlatVector<int32_t>({std::nullopt, 1}),
         makeNullableFlatVector<StringView>({std::nullopt, "10"}),
@@ -145,7 +480,7 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
         makeNullableFlatVector<int32_t>({std::nullopt, 1}),
         makeRowVector({
             makeFlatVector<int32_t>({1, 3}),
-            makeNullableFlatVector<velox::StringView>({std::nullopt, "de"}),
+            makeNullableFlatVector<StringView>({std::nullopt, "de"}),
         }),
         makeNullableFlatVector<StringView>({std::nullopt, "10 I'm not inline 
string"}),
         makeArrayVector<int64_t>({
@@ -160,12 +495,12 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
 
 TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
   auto vector = makeRowVector({
       makeFlatVector<int32_t>({1, 2, 1, 2}),
       makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt}),
       makeFlatVector<int64_t>({1, 2, 3, 4}),
-      makeFlatVector<velox::StringView>({"nn", "re", "fr", "juiu"}),
+      makeFlatVector<StringView>({"nn", "re", "fr", "juiu"}),
       makeFlatVector<int64_t>({232, 34567235, 1212, 4567}, DECIMAL(12, 4)),
       makeFlatVector<int128_t>({232, 34567235, 1212, 4567}, DECIMAL(20, 4)),
       makeFlatVector<int32_t>(
@@ -188,7 +523,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
   auto firstBlock = makeRowVector({
       makeNullableFlatVector<int8_t>({2, std::nullopt}),
       makeFlatVector<int64_t>({2, 4}),
-      makeFlatVector<velox::StringView>({"re", "juiu"}),
+      makeFlatVector<StringView>({"re", "juiu"}),
       makeFlatVector<int64_t>({34567235, 4567}, DECIMAL(12, 4)),
       makeFlatVector<int128_t>({34567235, 4567}, DECIMAL(20, 4)),
       makeFlatVector<int32_t>({1, 1}, DATE()),
@@ -198,7 +533,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
   auto secondBlock = makeRowVector({
       makeNullableFlatVector<int8_t>({1, 3}),
       makeFlatVector<int64_t>({1, 3}),
-      makeFlatVector<velox::StringView>({"nn", "fr"}),
+      makeFlatVector<StringView>({"nn", "fr"}),
       makeFlatVector<int64_t>({232, 1212}, DECIMAL(12, 4)),
       makeFlatVector<int128_t>({232, 1212}, DECIMAL(20, 4)),
       makeNullableFlatVector<int32_t>({std::nullopt, 0}, DATE()),
@@ -210,7 +545,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
 
 TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
   auto children = childrenComplex_;
   children.insert((children.begin()), makeFlatVector<int32_t>({1, 2}));
   auto vector = makeRowVector(children);
@@ -222,7 +557,7 @@ TEST_P(HashPartitioningShuffleWriter, 
hashPart1VectorComplexType) {
 
 TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
 
   auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{1, 2, 3, 4, 8}, {0, 1}, {1, 2, 3, 4, 8}});
   auto blockPid1 = takeRows({inputVector1_}, {{0, 5, 6, 7, 9, 0, 5, 6, 7, 9}});
@@ -238,10 +573,10 @@ TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
 TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
   const int32_t expectedMaxBatchSize = 8;
   ASSERT_NOT_OK(initShuffleWriterOptions());
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
   // calculate maxBatchSize_
   ASSERT_NOT_OK(splitRowVector(*shuffleWriter, hashInputVector1_));
-  if (GetParam().shuffleWriterType == kHashShuffle) {
+  if (GetParam().shuffleWriterType == ShuffleWriterType::kHashShuffle) {
     VELOX_CHECK_EQ(shuffleWriter->maxBatchSize(), expectedMaxBatchSize);
   }
 
@@ -253,9 +588,9 @@ TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
       *shuffleWriter, {hashInputVector2_, hashInputVector1_}, 2, 
inputVector1_->type(), {{blockPid2}, {blockPid1}});
 }
 
-TEST_P(RangePartitioningShuffleWriter, rangePartition) {
+TEST_P(RangePartitioningShuffleWriter, range) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
 
   auto blockPid1 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{0, 2, 4, 6, 8}, {0}, {0, 2, 4, 6, 8}});
   auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{1, 3, 5, 7, 9}, {1}, {1, 3, 5, 7, 9}});
@@ -270,7 +605,7 @@ TEST_P(RangePartitioningShuffleWriter, rangePartition) {
 
 TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
 
   auto blockPid1 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{0, 2, 4, 6, 8}, {0}, {0, 2, 4, 6, 8}});
   auto blockPid2 = takeRows({inputVector1_, inputVector2_, inputVector1_}, 
{{1, 3, 5, 7, 9}, {1}, {1, 3, 5, 7, 9}});
@@ -284,12 +619,12 @@ TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
 }
 
 TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) {
-  if (GetParam().shuffleWriterType != kHashShuffle) {
+  if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
     return;
   }
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.bufferReallocThreshold = 0; // Force re-alloc on 
buffer size changed.
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
 
   // First spilt no null.
   auto inputNoNull = inputVectorNoNull_;
@@ -302,8 +637,8 @@ TEST_P(RoundRobinPartitioningShuffleWriter, 
preAllocForceRealloc) {
       makeNullableFlatVector<int64_t>({0, 1}),
       makeNullableFlatVector<float>({0, 0.142857}),
       makeNullableFlatVector<bool>({false, true}),
-      makeNullableFlatVector<velox::StringView>({"", "alice"}),
-      makeNullableFlatVector<velox::StringView>({"alice", ""}),
+      makeNullableFlatVector<StringView>({"", "alice"}),
+      makeNullableFlatVector<StringView>({"alice", ""}),
   };
 
   auto inputHasNull = makeRowVector(intHasNull);
@@ -342,12 +677,12 @@ TEST_P(RoundRobinPartitioningShuffleWriter, 
preAllocForceRealloc) {
 }
 
 TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceReuse) {
-  if (GetParam().shuffleWriterType != kHashShuffle) {
+  if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
     return;
   }
   ASSERT_NOT_OK(initShuffleWriterOptions());
   shuffleWriterOptions_.bufferReallocThreshold = 1; // Force re-alloc on 
buffer size changed.
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
 
   // First spilt no null.
   auto inputNoNull = inputVectorNoNull_;
@@ -361,8 +696,8 @@ TEST_P(RoundRobinPartitioningShuffleWriter, 
preAllocForceReuse) {
       makeNullableFlatVector<int64_t>({0, 1}),
       makeNullableFlatVector<float>({0, 0.142857}),
       makeNullableFlatVector<bool>({false, true}),
-      makeNullableFlatVector<velox::StringView>({std::nullopt, std::nullopt}),
-      makeNullableFlatVector<velox::StringView>({std::nullopt, std::nullopt}),
+      makeNullableFlatVector<StringView>({std::nullopt, std::nullopt}),
+      makeNullableFlatVector<StringView>({std::nullopt, std::nullopt}),
   };
   auto inputStringHasNull = makeRowVector(stringHasNull);
 
@@ -377,11 +712,11 @@ TEST_P(RoundRobinPartitioningShuffleWriter, 
preAllocForceReuse) {
 }
 
 TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) {
-  if (GetParam().shuffleWriterType != kHashShuffle) {
+  if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
     return;
   }
   ASSERT_NOT_OK(initShuffleWriterOptions());
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
 
   ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
 
@@ -414,11 +749,11 @@ TEST_P(RoundRobinPartitioningShuffleWriter, 
spillVerifyResult) {
 }
 
 TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
-  if (GetParam().shuffleWriterType != kSortShuffle) {
+  if (GetParam().shuffleWriterType != ShuffleWriterType::kSortShuffle) {
     return;
   }
   ASSERT_NOT_OK(initShuffleWriterOptions());
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
+  auto shuffleWriter = createShuffleWriter(2);
 
   // Set memLimit to 0 to force allocate a new buffer for each row.
   ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_, 0));
@@ -428,307 +763,30 @@ TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) 
{
   shuffleWriteReadMultiBlocks(*shuffleWriter, 2, inputVector1_->type(), 
{{blockPid1}, {blockPid2}});
 }
 
-TEST_F(VeloxHashShuffleWriterMemoryTest, memoryLeak) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  std::shared_ptr<arrow::MemoryPool> pool = 
std::make_shared<LimitedMemoryPool>();
-  shuffleWriterOptions_.bufferSize = 4;
-
-  auto shuffleWriter = createShuffleWriter(pool.get());
-
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
-  ASSERT_NOT_OK(shuffleWriter->stop());
-
-  ASSERT_TRUE(pool->bytes_allocated() == 0);
-  shuffleWriter.reset();
-  ASSERT_TRUE(pool->bytes_allocated() == 0);
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, spillFailWithOutOfMemory) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  std::shared_ptr<arrow::MemoryPool> pool = 
std::make_shared<LimitedMemoryPool>(0);
-  shuffleWriterOptions_.bufferSize = 4;
-
-  auto shuffleWriter = createShuffleWriter(pool.get());
-
-  auto status = splitRowVector(*shuffleWriter, inputVector1_);
-
-  // Should return OOM status because there's no partition buffer to spill.
-  ASSERT_TRUE(status.IsOutOfMemory());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kInit) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  shuffleWriterOptions_.bufferSize = 4;
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
-
-  // Test spill all partition buffers.
-  {
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
-    auto bufferSize = shuffleWriter->partitionBufferSize();
-    auto payloadSize = shuffleWriter->cachedPayloadSize();
-    int64_t evicted;
-    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + bufferSize, 
&evicted));
-    ASSERT_EQ(evicted, payloadSize + bufferSize);
-    // No cached payload after evict.
-    ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
-    // All partition buffers should be evicted.
-    ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
-  }
-
-  // Test spill minimum-size partition buffers.
-  {
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
-    auto bufferSize = shuffleWriter->partitionBufferSize();
-    auto payloadSize = shuffleWriter->cachedPayloadSize();
-    int64_t evicted;
-    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
-    ASSERT_GT(evicted, payloadSize);
-    // No cached payload after evict.
-    ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
-    ASSERT_LE(shuffleWriter->partitionBufferSize(), bufferSize);
-    // Not all partition buffers was evicted.
-    ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
-  }
-
-  // Test spill empty partition buffers.
-  {
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
-    // Clear buffers then the size after shrink will be 0.
-    for (auto pid = 0; pid < kDefaultShufflePartitions; ++pid) {
-      ASSERT_NOT_OK(shuffleWriter->evictPartitionBuffers(pid, true));
-    }
-
-    auto bufferSize = shuffleWriter->partitionBufferSize();
-    auto payloadSize = shuffleWriter->cachedPayloadSize();
-    int64_t evicted;
-    // Evict payload and shrink min-size buffer.
-    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
-    ASSERT_GT(evicted, payloadSize);
-    ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
-    // Evict empty partition buffers.
-    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(bufferSize, &evicted));
-    ASSERT_GT(evicted, 0);
-    ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
-    // Evict again. No reclaimable space.
-    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
-    ASSERT_EQ(evicted, 0);
-  }
-
-  ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kInitSingle) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  shuffleWriterOptions_.partitioning = Partitioning::kSingle;
-  shuffleWriterOptions_.bufferSize = 4;
-  auto shuffleWriter = createShuffleWriter(defaultArrowMemoryPool().get());
-
-  {
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
-    auto payloadSize = shuffleWriter->partitionBufferSize() + 
shuffleWriter->cachedPayloadSize();
-    int64_t evicted;
-    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
-    ASSERT_GE(evicted, payloadSize);
-    // No cached payload after evict.
-    ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
-  }
-
-  ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kSplit) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  shuffleWriterOptions_.bufferSize = 4;
-  auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
-  auto shuffleWriter = createShuffleWriter(&pool);
-
-  pool.setEvictable(shuffleWriter.get());
-
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
-  // Trigger spill for the next split.
-  ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-  }));
-
-  ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  shuffleWriterOptions_.partitioning = Partitioning::kSingle;
-  auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
-
-  auto shuffleWriter = createShuffleWriter(1, &pool);
-
-  pool.setEvictable(shuffleWriter.get());
-
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
-  // Trigger spill for the next split.
-  ASSERT_TRUE(pool.checkEvict(
-      shuffleWriter->cachedPayloadSize(), [&] { 
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); }));
-
-  ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) {
-  for (const auto partitioning : {Partitioning::kSingle, 
Partitioning::kRoundRobin}) {
-    ASSERT_NOT_OK(initShuffleWriterOptions());
-    shuffleWriterOptions_.bufferSize = 4096;
-    // Force compression.
-    partitionWriterOptions_.compressionThreshold = 0;
-    partitionWriterOptions_.mergeThreshold = 0;
-    auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
-    auto shuffleWriter = createShuffleWriter(&pool);
-
-    pool.setEvictable(shuffleWriter.get());
-
-    for (int i = 0; i < 10; ++i) {
-      ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-    }
-    // Reclaim bytes to shrink partition buffer.
-    int64_t reclaimed = 0;
-    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
-    ASSERT_TRUE(reclaimed >= 2000);
-
-    // Trigger spill during stop.
-    ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { 
ASSERT_NOT_OK(shuffleWriter->stop()); }));
-  }
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  shuffleWriterOptions_.bufferSize = 4096;
-  // Force compression.
-  partitionWriterOptions_.compressionThreshold = 0;
-  partitionWriterOptions_.mergeThreshold = 0;
-  auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
-  auto shuffleWriter = createShuffleWriter(&pool);
-
-  pool.setEvictable(shuffleWriter.get());
-  for (int i = 0; i < 3; ++i) {
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
-  }
-  // Reclaim bytes to shrink partition buffer.
-  int64_t reclaimed = 0;
-  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
-  ASSERT_TRUE(reclaimed >= 2000);
-
-  // Reclaim from PartitionWriter to free cached bytes.
-  auto payloadSize = shuffleWriter->cachedPayloadSize();
-  int64_t evicted;
-  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
-  ASSERT_EQ(evicted, payloadSize);
-
-  // When evicting partitioning buffers in stop, spill will be triggered by 
complex types allocating extra memory.
-  ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { 
ASSERT_NOT_OK(shuffleWriter->stop()); }));
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, evictPartitionBuffers) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  shuffleWriterOptions_.bufferSize = 4;
-  auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
-  auto shuffleWriter = createShuffleWriter(&pool);
-
-  pool.setEvictable(shuffleWriter.get());
-
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
-  // First evict cached payloads.
-  int64_t evicted;
-  
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->cachedPayloadSize(),
 &evicted));
-  ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
-  ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
-  // Set limited capacity.
-  pool.setCapacity(0);
-  // Evict again. Because no cached payload to evict, it will try to evict all 
partition buffers.
-  
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->partitionBufferSize(),
 &evicted));
-  ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, kUnevictableSingle) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  shuffleWriterOptions_.partitioning = Partitioning::kSingle;
-  auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get());
-  auto shuffleWriter = createShuffleWriter(&pool);
-
-  pool.setEvictable(shuffleWriter.get());
-
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-
-  // First evict cached payloads.
-  int64_t evicted;
-  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(
-      shuffleWriter->partitionBufferSize() + 
shuffleWriter->cachedPayloadSize(), &evicted));
-  ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
-  ASSERT_GE(evicted, 0);
-
-  // Evict again. The evicted size should be 0.
-  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
-  ASSERT_EQ(evicted, 0);
-}
-
-TEST_F(VeloxHashShuffleWriterMemoryTest, resizeBinaryBufferTriggerSpill) {
-  ASSERT_NOT_OK(initShuffleWriterOptions());
-  shuffleWriterOptions_.bufferReallocThreshold = 1;
-  auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
-  auto shuffleWriter = createShuffleWriter(&pool);
-
-  pool.setEvictable(shuffleWriter.get());
-
-  // Split first input vector. Large average string length.
-  ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary1_));
-
-  // Evict cached payloads.
-  int64_t evicted;
-  
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->cachedPayloadSize(),
 &evicted));
-  ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
-  // Set limited capacity.
-  ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] {
-    // Split second input vector. Large average string length.
-    ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary2_));
-  }));
-  ASSERT_NOT_OK(shuffleWriter->stop());
-}
-
 INSTANTIATE_TEST_SUITE_P(
-    VeloxShuffleWriteParam,
+    SinglePartitioningShuffleWriterGroup,
     SinglePartitioningShuffleWriter,
-    ::testing::ValuesIn(kShuffleWriteTestParams));
+    ::testing::ValuesIn(getTestParams()));
 
 INSTANTIATE_TEST_SUITE_P(
-    VeloxShuffleWriteParam,
+    RoundRobinPartitioningShuffleWriterGroup,
     RoundRobinPartitioningShuffleWriter,
-    ::testing::ValuesIn(kShuffleWriteTestParams));
+    ::testing::ValuesIn(getTestParams()));
 
 INSTANTIATE_TEST_SUITE_P(
-    VeloxShuffleWriteParam,
+    HashPartitioningShuffleWriterGroup,
     HashPartitioningShuffleWriter,
-    ::testing::ValuesIn(kShuffleWriteTestParams));
+    ::testing::ValuesIn(getTestParams()));
 
 INSTANTIATE_TEST_SUITE_P(
-    VeloxShuffleWriteParam,
+    RangePartitioningShuffleWriterGroup,
     RangePartitioningShuffleWriter,
-    ::testing::ValuesIn(kShuffleWriteTestParams));
+    ::testing::ValuesIn(getTestParams()));
 
 } // namespace gluten
+
+int main(int argc, char** argv) {
+  testing::InitGoogleTest(&argc, argv);
+  ::testing::AddGlobalTestEnvironment(new 
gluten::VeloxShuffleWriterTestEnvironment);
+  return RUN_ALL_TESTS();
+}
diff --git a/cpp/velox/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
new file mode 100644
index 0000000000..913bd8e487
--- /dev/null
+++ b/cpp/velox/tests/VeloxShuffleWriterTestBase.h
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <arrow/result.h>
+
+#include <gtest/gtest.h>
+
+#include <compute/VeloxBackend.h>
+#include "memory/VeloxColumnarBatch.h"
+#include "shuffle/LocalPartitionWriter.h"
+#include "shuffle/PartitionWriter.h"
+#include "shuffle/VeloxShuffleReader.h"
+#include "shuffle/rss/RssPartitionWriter.h"
+#include "utils/LocalRssClient.h"
+#include "utils/TestAllocationListener.h"
+#include "utils/VeloxArrowUtils.h"
+#include "velox/type/Type.h"
+
+#include "velox/vector/tests/VectorTestUtils.h"
+
+namespace gluten {
+
+namespace {
+std::string makeString(uint32_t length) {
+  static const std::string kLargeStringOf128Bytes =
+      "thisisalaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
+      "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaargestringlengthmorethan16bytes";
+  std::string res{};
+  auto repeats = length / kLargeStringOf128Bytes.length();
+  while (repeats--) {
+    res.append(kLargeStringOf128Bytes);
+  }
+  if (auto remains = length % kLargeStringOf128Bytes.length()) {
+    res.append(kLargeStringOf128Bytes.substr(0, remains));
+  }
+  return res;
+}
+
+std::unique_ptr<PartitionWriter> createPartitionWriter(
+    PartitionWriterType partitionWriterType,
+    uint32_t numPartitions,
+    const std::string& dataFile,
+    const std::vector<std::string>& localDirs,
+    const PartitionWriterOptions& options,
+    arrow::MemoryPool* pool) {
+  if (partitionWriterType == PartitionWriterType::kRss) {
+    auto rssClient = std::make_unique<LocalRssClient>(dataFile);
+    return std::make_unique<RssPartitionWriter>(numPartitions, options, pool, 
std::move(rssClient));
+  }
+  return std::make_unique<LocalPartitionWriter>(numPartitions, options, pool, 
dataFile, localDirs);
+}
+} // namespace
+
+class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase {
+ public:
+  virtual ~VeloxShuffleWriterTestBase() = default;
+
+  static void setUpVeloxBackend() {
+    auto listener = std::make_unique<TestAllocationListener>();
+    listener_ = listener.get();
+
+    std::unordered_map<std::string, std::string> 
conf{{kMemoryReservationBlockSize, "1"}};
+
+    VeloxBackend::create(std::move(listener), conf);
+  }
+
+  static void tearDownVeloxBackend() {
+    VeloxBackend::get()->tearDown();
+  }
+
+ protected:
+  void setUpTestData() {
+    GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFile());
+
+    // Set up test data.
+    children1_ = {
+        makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt, 4, 
std::nullopt, 5, 6, std::nullopt, 7}),
+        makeNullableFlatVector<int8_t>({1, -1, std::nullopt, std::nullopt, -2, 
2, std::nullopt, std::nullopt, 3, -3}),
+        makeNullableFlatVector<int32_t>({1, 2, 3, 4, std::nullopt, 5, 6, 7, 8, 
std::nullopt}),
+        makeNullableFlatVector<int64_t>(
+            {std::nullopt,
+             std::nullopt,
+             std::nullopt,
+             std::nullopt,
+             std::nullopt,
+             std::nullopt,
+             std::nullopt,
+             std::nullopt,
+             std::nullopt,
+             std::nullopt}),
+        makeNullableFlatVector<float>(
+            {-0.1234567,
+             std::nullopt,
+             0.1234567,
+             std::nullopt,
+             -0.142857,
+             std::nullopt,
+             0.142857,
+             0.285714,
+             0.428617,
+             std::nullopt}),
+        makeNullableFlatVector<bool>(
+            {std::nullopt, true, false, std::nullopt, true, true, false, true, 
std::nullopt, std::nullopt}),
+        makeFlatVector<facebook::velox::StringView>(
+            {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6", 
"boB7", "ALICE8", "BOB9"}),
+        makeNullableFlatVector<facebook::velox::StringView>(
+            {"alice", "bob", std::nullopt, std::nullopt, "Alice", "Bob", 
std::nullopt, "alicE", std::nullopt, "boB"}),
+        facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 10, 
pool())};
+
+    children2_ = {
+        makeNullableFlatVector<int8_t>({std::nullopt, std::nullopt}),
+        makeFlatVector<int8_t>({1, -1}),
+        makeNullableFlatVector<int32_t>({100, std::nullopt}),
+        makeFlatVector<int64_t>({1, 1}),
+        makeFlatVector<float>({0.142857, -0.142857}),
+        makeFlatVector<bool>({true, false}),
+        makeFlatVector<facebook::velox::StringView>(
+            {"bob",
+             
"alicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealice"}),
+        makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, 
std::nullopt}),
+        facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 2, 
pool())};
+
+    childrenNoNull_ = {
+        makeFlatVector<int8_t>({0, 1}),
+        makeFlatVector<int8_t>({0, -1}),
+        makeFlatVector<int32_t>({0, 100}),
+        makeFlatVector<int64_t>({0, 1}),
+        makeFlatVector<float>({0, 0.142857}),
+        makeFlatVector<bool>({false, true}),
+        makeFlatVector<facebook::velox::StringView>({"", "alice"}),
+        makeFlatVector<facebook::velox::StringView>({"alice", ""}),
+    };
+
+    largeString1_ = makeString(1024);
+    int32_t numRows = 1024;
+    childrenLargeBinary1_ = {
+        makeFlatVector<int8_t>(std::vector<int8_t>(numRows, 0)),
+        makeFlatVector<int8_t>(std::vector<int8_t>(numRows, 0)),
+        makeFlatVector<int32_t>(std::vector<int32_t>(numRows, 0)),
+        makeFlatVector<int64_t>(std::vector<int64_t>(numRows, 0)),
+        makeFlatVector<float>(std::vector<float>(numRows, 0)),
+        makeFlatVector<bool>(std::vector<bool>(numRows, true)),
+        makeNullableFlatVector<facebook::velox::StringView>(
+            std::vector<std::optional<facebook::velox::StringView>>(numRows, 
largeString1_.c_str())),
+        makeNullableFlatVector<facebook::velox::StringView>(
+            std::vector<std::optional<facebook::velox::StringView>>(numRows, 
std::nullopt)),
+    };
+
+    largeString2_ = makeString(4096);
+    numRows = 2048;
+    auto vectorToSpill = childrenLargeBinary2_ = {
+        makeFlatVector<int8_t>(std::vector<int8_t>(numRows, 0)),
+        makeFlatVector<int8_t>(std::vector<int8_t>(numRows, 0)),
+        makeFlatVector<int32_t>(std::vector<int32_t>(numRows, 0)),
+        makeFlatVector<int64_t>(std::vector<int64_t>(numRows, 0)),
+        makeFlatVector<float>(std::vector<float>(numRows, 0)),
+        makeFlatVector<bool>(std::vector<bool>(numRows, true)),
+        makeNullableFlatVector<facebook::velox::StringView>(
+            std::vector<std::optional<facebook::velox::StringView>>(numRows, 
largeString2_.c_str())),
+        makeNullableFlatVector<facebook::velox::StringView>(
+            std::vector<std::optional<facebook::velox::StringView>>(numRows, 
std::nullopt)),
+    };
+
+    childrenComplex_ = {
+        makeNullableFlatVector<int32_t>({std::nullopt, 1}),
+        makeRowVector({
+            makeFlatVector<int32_t>({1, 3}),
+            makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, 
"de"}),
+        }),
+        makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "10 
I'm not inline string"}),
+        makeArrayVector<int64_t>({
+            {1, 2, 3, 4, 5},
+            {1, 2, 3},
+        }),
+        makeMapVector<int32_t, facebook::velox::StringView>(
+            {{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4, 
"str4000"}}}),
+    };
+
+    inputVector1_ = makeRowVector(children1_);
+    inputVector2_ = makeRowVector(children2_);
+    inputVectorNoNull_ = makeRowVector(childrenNoNull_);
+    inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
+    inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
+    inputVectorComplex_ = makeRowVector(childrenComplex_);
+  }
+
+  arrow::Status splitRowVector(
+      VeloxShuffleWriter& shuffleWriter,
+      facebook::velox::RowVectorPtr vector,
+      int64_t memLimit = ShuffleWriter::kMinMemLimit) {
+    std::shared_ptr<ColumnarBatch> cb = 
std::make_shared<VeloxColumnarBatch>(vector);
+    return shuffleWriter.write(cb, memLimit);
+  }
+
+  // Create multiple local dirs and join with comma.
+  arrow::Status setLocalDirsAndDataFile() {
+    static const std::string kTestLocalDirsPrefix = "columnar-shuffle-test-";
+
+    // Create first tmp dir and create data file.
+    // To prevent tmpDirs from being deleted in the dtor, we need to store 
them.
+    tmpDirs_.emplace_back();
+    ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(), 
arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix))
+    ARROW_ASSIGN_OR_RAISE(dataFile_, 
createTempShuffleFile(tmpDirs_.back()->path().ToString()));
+    localDirs_.push_back(tmpDirs_.back()->path().ToString());
+
+    // Create second tmp dir.
+    tmpDirs_.emplace_back();
+    ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(), 
arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix))
+    localDirs_.push_back(tmpDirs_.back()->path().ToString());
+    return arrow::Status::OK();
+  }
+
+  virtual std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) = 0;
+
+  inline static TestAllocationListener* listener_{nullptr};
+
+  ShuffleWriterOptions shuffleWriterOptions_{};
+  PartitionWriterOptions partitionWriterOptions_{};
+
+  std::vector<std::unique_ptr<arrow::internal::TemporaryDir>> tmpDirs_;
+  std::string dataFile_;
+  std::vector<std::string> localDirs_;
+
+  std::vector<facebook::velox::VectorPtr> children1_;
+  std::vector<facebook::velox::VectorPtr> children2_;
+  std::vector<facebook::velox::VectorPtr> childrenNoNull_;
+  std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
+  std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;
+  std::vector<facebook::velox::VectorPtr> childrenComplex_;
+
+  facebook::velox::RowVectorPtr inputVector1_;
+  facebook::velox::RowVectorPtr inputVector2_;
+  facebook::velox::RowVectorPtr inputVectorNoNull_;
+  std::string largeString1_;
+  std::string largeString2_;
+  facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
+  facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
+  facebook::velox::RowVectorPtr inputVectorComplex_;
+};
+
+} // namespace gluten
diff --git a/cpp/velox/utils/LocalRssClient.cc 
b/cpp/velox/utils/LocalRssClient.cc
new file mode 100644
index 0000000000..3ef3c61444
--- /dev/null
+++ b/cpp/velox/utils/LocalRssClient.cc
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/LocalRssClient.h"
+#include "utils/Common.h"
+
+#include <arrow/io/file.h>
+
+namespace gluten {
+
+int32_t LocalRssClient::pushPartitionData(int32_t partitionId, const char* 
bytes, int64_t size) {
+  auto [iter, inserted] = partitionBufferMap_.try_emplace(partitionId, 
buffers_.size());
+  int32_t idx = iter->second;
+
+  // Allocate new buffer if it's a new partition.
+  if (inserted) {
+    buffers_.emplace_back();
+    GLUTEN_ASSIGN_OR_THROW(buffers_.back(), arrow::AllocateResizableBuffer(0));
+  }
+
+  auto& buffer = buffers_[idx];
+  auto newSize = buffer->size() + size;
+
+  if (buffer->capacity() < newSize) {
+    GLUTEN_THROW_NOT_OK(buffer->Reserve(newSize));
+  }
+
+  fastCopy(buffer->mutable_data() + buffer->size(), bytes, size);
+  GLUTEN_THROW_NOT_OK(buffer->Resize(newSize));
+
+  return size;
+}
+
+void LocalRssClient::stop() {
+  std::shared_ptr<arrow::io::FileOutputStream> out;
+  GLUTEN_ASSIGN_OR_THROW(out, arrow::io::FileOutputStream::Open(dataFile_));
+
+  for (auto item : partitionBufferMap_) {
+    GLUTEN_THROW_NOT_OK(out->Write(buffers_[item.second]->data(), 
buffers_[item.second]->size()));
+    GLUTEN_THROW_NOT_OK(out->Flush());
+  }
+  GLUTEN_THROW_NOT_OK(out->Close());
+
+  buffers_.clear();
+  partitionBufferMap_.clear();
+}
+
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/core/shuffle/rss/RssClient.h b/cpp/velox/utils/LocalRssClient.h
similarity index 57%
copy from cpp/core/shuffle/rss/RssClient.h
copy to cpp/velox/utils/LocalRssClient.h
index dddccfa1ad..e14c9fd01f 100644
--- a/cpp/core/shuffle/rss/RssClient.h
+++ b/cpp/velox/utils/LocalRssClient.h
@@ -17,11 +17,28 @@
 
 #pragma once
 
-class RssClient {
+#include "shuffle/rss/RssClient.h"
+#include "utils/Common.h"
+#include "utils/Macros.h"
+
+#include <arrow/buffer.h>
+
+#include <map>
+
+namespace gluten {
+
+/// A local implementation of the RssClient interface for testing purposes.
+class LocalRssClient : public RssClient {
  public:
-  virtual ~RssClient() = default;
+  LocalRssClient(std::string dataFile) : dataFile_(dataFile) {}
+
+  int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t 
size) override;
 
-  virtual int32_t pushPartitionData(int32_t partitionId, const char* bytes, 
int64_t size) = 0;
+  void stop() override;
 
-  virtual void stop() = 0;
+ private:
+  std::string dataFile_;
+  std::vector<std::unique_ptr<arrow::ResizableBuffer>> buffers_;
+  std::map<uint32_t, uint32_t> partitionBufferMap_;
 };
+} // namespace gluten
diff --git a/cpp/velox/utils/TestAllocationListener.cc 
b/cpp/velox/utils/TestAllocationListener.cc
new file mode 100644
index 0000000000..835ec263eb
--- /dev/null
+++ b/cpp/velox/utils/TestAllocationListener.cc
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/TestAllocationListener.h"
+#include "velox/common/base/SuccinctPrinter.h"
+
+#include <fmt/format.h>
+
+namespace gluten {
+
+void TestAllocationListener::allocationChanged(int64_t diff) {
+  if (diff > 0 && usedBytes_ + diff >= limit_) {
+    LOG(INFO) << fmt::format(
+        "reach hard limit {} when need {}, current used {}.",
+        facebook::velox::succinctBytes(limit_),
+        facebook::velox::succinctBytes(diff),
+        facebook::velox::succinctBytes(usedBytes_));
+    auto neededBytes = usedBytes_ + diff - limit_;
+    int64_t spilledBytes = 0;
+    if (iterator_) {
+      spilledBytes += iterator_->spillFixedSize(neededBytes);
+    }
+    if (spilledBytes < neededBytes && shuffleWriter_) {
+      int64_t reclaimed = 0;
+      GLUTEN_THROW_NOT_OK(shuffleWriter_->reclaimFixedSize(neededBytes - 
spilledBytes, &reclaimed));
+      spilledBytes += reclaimed;
+    }
+    reclaimedBytes_ += spilledBytes;
+    LOG(INFO) << fmt::format("spill finish, got {}.", 
facebook::velox::succinctBytes(spilledBytes));
+
+    if (spilledBytes < neededBytes && throwIfOOM_) {
+      throw GlutenException(fmt::format(
+          "Failed to reclaim {} bytes. Actual bytes reclaimed: {}",
+          facebook::velox::succinctBytes(neededBytes),
+          facebook::velox::succinctBytes(spilledBytes)));
+    }
+  }
+
+  usedBytes_ += diff;
+}
+
+int64_t TestAllocationListener::currentBytes() {
+  return usedBytes_;
+}
+
+int64_t TestAllocationListener::reclaimedBytes() const {
+  return reclaimedBytes_;
+}
+
+void TestAllocationListener::reset() {
+  usedBytes_ = 0;
+  reclaimedBytes_ = 0;
+  limit_ = std::numeric_limits<uint64_t>::max();
+  iterator_ = nullptr;
+  shuffleWriter_ = nullptr;
+  throwIfOOM_ = false;
+}
+} // namespace gluten
diff --git a/cpp/velox/utils/TestAllocationListener.h 
b/cpp/velox/utils/TestAllocationListener.h
new file mode 100644
index 0000000000..9a94709aa3
--- /dev/null
+++ b/cpp/velox/utils/TestAllocationListener.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "compute/ResultIterator.h"
+#include "memory/AllocationListener.h"
+#include "shuffle/ShuffleWriter.h"
+
+namespace gluten {
+
+// instance with limited capacity, used by tests and benchmarks.
+class TestAllocationListener final : public AllocationListener {
+ public:
+  TestAllocationListener() = default;
+
+  void setThrowIfOOM(bool throwIfOOM) {
+    throwIfOOM_ = throwIfOOM;
+  }
+
+  void updateLimit(uint64_t limit) {
+    limit_ = limit;
+  }
+
+  void setIterator(ResultIterator* iterator) {
+    iterator_ = iterator;
+  }
+
+  void setShuffleWriter(ShuffleWriter* shuffleWriter) {
+    shuffleWriter_ = shuffleWriter;
+  }
+
+  void allocationChanged(int64_t diff) override;
+
+  int64_t currentBytes() override;
+
+  int64_t reclaimedBytes() const;
+
+  void reset();
+
+ private:
+  bool throwIfOOM_{false};
+
+  uint64_t usedBytes_{0L};
+  uint64_t reclaimedBytes_{0L};
+
+  uint64_t limit_{std::numeric_limits<uint64_t>::max()};
+  ResultIterator* iterator_{nullptr};
+  ShuffleWriter* shuffleWriter_{nullptr};
+};
+
+} // namespace gluten
diff --git a/cpp/velox/utils/VeloxArrowUtils.cc 
b/cpp/velox/utils/VeloxArrowUtils.cc
index f26b49a476..7c95835625 100644
--- a/cpp/velox/utils/VeloxArrowUtils.cc
+++ b/cpp/velox/utils/VeloxArrowUtils.cc
@@ -57,6 +57,25 @@ arrow::Result<std::shared_ptr<ColumnarBatch>> 
recordBatch2VeloxColumnarBatch(con
   return 
std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<velox::RowVector>(vp));
 }
 
+facebook::velox::common::CompressionKind 
arrowCompressionTypeToVelox(arrow::Compression::type type) {
+  switch (type) {
+    case arrow::Compression::UNCOMPRESSED:
+      return facebook::velox::common::CompressionKind::CompressionKind_NONE;
+    case arrow::Compression::LZ4_FRAME:
+      return facebook::velox::common::CompressionKind::CompressionKind_LZ4;
+    case arrow::Compression::ZSTD:
+      return facebook::velox::common::CompressionKind::CompressionKind_ZSTD;
+    case arrow::Compression::GZIP:
+      return facebook::velox::common::CompressionKind::CompressionKind_GZIP;
+    case arrow::Compression::SNAPPY:
+      return facebook::velox::common::CompressionKind::CompressionKind_SNAPPY;
+    case arrow::Compression::LZO:
+      return facebook::velox::common::CompressionKind::CompressionKind_LZO;
+    default:
+      VELOX_UNSUPPORTED("Unsupported arrow compression type {}", 
arrow::util::Codec::GetCodecAsString(type));
+  }
+}
+
 arrow::Result<std::shared_ptr<arrow::Buffer>> toArrowBuffer(
     facebook::velox::BufferPtr buffer,
     arrow::MemoryPool* pool) {
diff --git a/cpp/velox/utils/VeloxArrowUtils.h 
b/cpp/velox/utils/VeloxArrowUtils.h
index 3fb350266d..bacfbbd2b8 100644
--- a/cpp/velox/utils/VeloxArrowUtils.h
+++ b/cpp/velox/utils/VeloxArrowUtils.h
@@ -19,14 +19,18 @@
 
 #pragma once
 
-#include <arrow/memory_pool.h>
-#include <arrow/type.h>
 #include "memory/ColumnarBatch.h"
+
 #include "velox/buffer/Buffer.h"
+#include "velox/common/compression/Compression.h"
 #include "velox/common/memory/MemoryPool.h"
 #include "velox/type/Type.h"
 #include "velox/vector/arrow/Bridge.h"
 
+#include <arrow/memory_pool.h>
+#include <arrow/type.h>
+#include <arrow/util/compression.h>
+
 namespace gluten {
 
 class ArrowUtils {
@@ -56,4 +60,5 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> 
toArrowBuffer(facebook::velox::Buf
  */
 arrow::Result<std::shared_ptr<ColumnarBatch>> 
recordBatch2VeloxColumnarBatch(const arrow::RecordBatch& rb);
 
+facebook::velox::common::CompressionKind 
arrowCompressionTypeToVelox(arrow::Compression::type type);
 } // namespace gluten
diff --git a/cpp/velox/utils/tests/LocalRssClient.h 
b/cpp/velox/utils/tests/LocalRssClient.h
deleted file mode 100644
index ff2f89ef7f..0000000000
--- a/cpp/velox/utils/tests/LocalRssClient.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <arrow/buffer.h>
-#include <arrow/util/io_util.h>
-#include <map>
-#include "shuffle/rss/RssClient.h"
-#include "utils/Common.h"
-#include "utils/Macros.h"
-
-namespace gluten {
-
-class LocalRssClient : public RssClient {
- public:
-  LocalRssClient(std::string dataFile) : dataFile_(dataFile) {}
-
-  int32_t pushPartitionData(int32_t partitionId, const char* bytes, int64_t 
size) {
-    auto idx = -1;
-    auto maybeIdx = partitionIdx_.find(partitionId);
-    if (maybeIdx == partitionIdx_.end()) {
-      idx = partitionIdx_.size();
-      partitionIdx_[partitionId] = idx;
-      auto buffer = arrow::AllocateResizableBuffer(0).ValueOrDie();
-      partitionBuffers_.push_back(std::move(buffer));
-    } else {
-      idx = maybeIdx->second;
-    }
-
-    auto& buffer = partitionBuffers_[idx];
-    auto newSize = buffer->size() + size;
-    if (buffer->capacity() < newSize) {
-      GLUTEN_THROW_NOT_OK(buffer->Reserve(newSize));
-    }
-    memcpy(buffer->mutable_data() + buffer->size(), bytes, size);
-    GLUTEN_THROW_NOT_OK(buffer->Resize(newSize));
-    return size;
-  }
-
-  void stop() {
-    std::shared_ptr<arrow::io::FileOutputStream> fout;
-    GLUTEN_ASSIGN_OR_THROW(fout, arrow::io::FileOutputStream::Open(dataFile_));
-
-    for (auto item : partitionIdx_) {
-      auto idx = item.second;
-      GLUTEN_THROW_NOT_OK(fout->Write(partitionBuffers_[idx]->data(), 
partitionBuffers_[idx]->size()));
-      GLUTEN_THROW_NOT_OK(fout->Flush());
-    }
-    GLUTEN_THROW_NOT_OK(fout->Close());
-    partitionBuffers_.clear();
-    partitionIdx_.clear();
-  }
-
- private:
-  std::string dataFile_;
-  std::vector<std::unique_ptr<arrow::ResizableBuffer>> partitionBuffers_;
-  std::map<uint32_t, uint32_t> partitionIdx_;
-};
-
-} // namespace gluten
diff --git a/cpp/velox/utils/tests/MemoryPoolUtils.cc 
b/cpp/velox/utils/tests/MemoryPoolUtils.cc
deleted file mode 100644
index 2d4e19511a..0000000000
--- a/cpp/velox/utils/tests/MemoryPoolUtils.cc
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "utils/tests/MemoryPoolUtils.h"
-
-namespace gluten {
-
-arrow::Status LimitedMemoryPool::Allocate(int64_t size, int64_t alignment, 
uint8_t** out) {
-  if (bytes_allocated() + size > capacity_) {
-    return arrow::Status::OutOfMemory("malloc of size ", size, " failed");
-  }
-  RETURN_NOT_OK(pool_->Allocate(size, alignment, out));
-  stats_.UpdateAllocatedBytes(size);
-  return arrow::Status::OK();
-}
-
-arrow::Status LimitedMemoryPool::Reallocate(int64_t oldSize, int64_t newSize, 
int64_t alignment, uint8_t** ptr) {
-  if (newSize > capacity_) {
-    return arrow::Status::OutOfMemory("malloc of size ", newSize, " failed");
-  }
-  RETURN_NOT_OK(pool_->Reallocate(oldSize, newSize, alignment, ptr));
-  stats_.UpdateAllocatedBytes(newSize - oldSize);
-  return arrow::Status::OK();
-}
-
-void LimitedMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) 
{
-  pool_->Free(buffer, size, alignment);
-  stats_.UpdateAllocatedBytes(-size);
-}
-
-int64_t LimitedMemoryPool::bytes_allocated() const {
-  return stats_.bytes_allocated();
-}
-
-int64_t LimitedMemoryPool::max_memory() const {
-  return pool_->max_memory();
-}
-
-int64_t LimitedMemoryPool::total_bytes_allocated() const {
-  return pool_->total_bytes_allocated();
-}
-
-int64_t LimitedMemoryPool::num_allocations() const {
-  throw pool_->num_allocations();
-}
-
-std::string LimitedMemoryPool::backend_name() const {
-  return pool_->backend_name();
-}
-
-bool SelfEvictedMemoryPool::checkEvict(int64_t newCapacity, 
std::function<void()> block) {
-  bytesEvicted_ = 0;
-  auto capacity = capacity_;
-  // Limit the capacity to trigger evict.
-  setCapacity(newCapacity);
-
-  block();
-
-  capacity_ = capacity;
-  return bytesEvicted_ > 0;
-}
-
-void SelfEvictedMemoryPool::setCapacity(int64_t capacity) {
-  if (capacity < bytes_allocated()) {
-    capacity_ = bytes_allocated();
-  } else {
-    capacity_ = capacity;
-  }
-}
-
-int64_t SelfEvictedMemoryPool::capacity() const {
-  return capacity_;
-}
-
-void SelfEvictedMemoryPool::setEvictable(Reclaimable* evictable) {
-  evictable_ = evictable;
-}
-
-arrow::Status SelfEvictedMemoryPool::Allocate(int64_t size, int64_t alignment, 
uint8_t** out) {
-  RETURN_NOT_OK(ensureCapacity(size));
-  return pool_->Allocate(size, alignment, out);
-}
-
-arrow::Status SelfEvictedMemoryPool::Reallocate(int64_t oldSize, int64_t 
newSize, int64_t alignment, uint8_t** ptr) {
-  if (newSize > oldSize) {
-    RETURN_NOT_OK(ensureCapacity(newSize - oldSize));
-  }
-  return pool_->Reallocate(oldSize, newSize, alignment, ptr);
-}
-
-void SelfEvictedMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t 
alignment) {
-  return pool_->Free(buffer, size, alignment);
-}
-
-int64_t SelfEvictedMemoryPool::bytes_allocated() const {
-  return pool_->bytes_allocated();
-}
-
-int64_t SelfEvictedMemoryPool::max_memory() const {
-  return pool_->max_memory();
-}
-
-std::string SelfEvictedMemoryPool::backend_name() const {
-  return pool_->backend_name();
-}
-
-int64_t SelfEvictedMemoryPool::total_bytes_allocated() const {
-  return pool_->total_bytes_allocated();
-}
-
-int64_t SelfEvictedMemoryPool::num_allocations() const {
-  throw pool_->num_allocations();
-}
-
-arrow::Status SelfEvictedMemoryPool::ensureCapacity(int64_t size) {
-  VELOX_CHECK_NOT_NULL(evictable_);
-  DLOG(INFO) << "Size: " << size << ", capacity_: " << capacity_ << ", bytes 
allocated: " << pool_->bytes_allocated();
-  if (size > capacity_ - pool_->bytes_allocated()) {
-    // Self evict.
-    int64_t actual;
-    RETURN_NOT_OK(evictable_->reclaimFixedSize(size, &actual));
-    if (size > capacity_ - pool_->bytes_allocated()) {
-      if (failIfOOM_) {
-        return arrow::Status::OutOfMemory(
-            "Failed to allocate after evict. Capacity: ",
-            capacity_,
-            ", Requested: ",
-            size,
-            ", Evicted: ",
-            actual,
-            ", Allocated: ",
-            pool_->bytes_allocated());
-      } else {
-        capacity_ = size + pool_->bytes_allocated();
-      }
-    }
-    bytesEvicted_ += actual;
-  }
-  return arrow::Status::OK();
-}
-
-} // namespace gluten
diff --git a/cpp/velox/utils/tests/MemoryPoolUtils.h 
b/cpp/velox/utils/tests/MemoryPoolUtils.h
deleted file mode 100644
index 5fdf880be6..0000000000
--- a/cpp/velox/utils/tests/MemoryPoolUtils.h
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <arrow/memory_pool.h>
-#include "memory/Reclaimable.h"
-#include "utils/Exception.h"
-#include "velox/common/base/Exceptions.h"
-
-namespace gluten {
-
-// arrow::MemoryPool instance with limited capacity, used by tests and 
benchmarks.
-class LimitedMemoryPool final : public arrow::MemoryPool {
- public:
-  explicit LimitedMemoryPool() : 
capacity_(std::numeric_limits<int64_t>::max()) {}
-  explicit LimitedMemoryPool(int64_t capacity) : capacity_(capacity) {}
-
-  arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) 
override;
-
-  arrow::Status Reallocate(int64_t oldSize, int64_t newSize, int64_t 
alignment, uint8_t** ptr) override;
-
-  void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
-
-  int64_t bytes_allocated() const override;
-
-  int64_t max_memory() const override;
-
-  int64_t total_bytes_allocated() const override;
-
-  int64_t num_allocations() const override;
-
-  std::string backend_name() const override;
-
- private:
-  arrow::MemoryPool* pool_ = arrow::default_memory_pool();
-  int64_t capacity_;
-  arrow::internal::MemoryPoolStats stats_;
-};
-
-// arrow::MemoryPool instance with limited capacity and can be evictable on 
OOM, used by tests and benchmarks.
-class SelfEvictedMemoryPool : public arrow::MemoryPool {
- public:
-  explicit SelfEvictedMemoryPool(arrow::MemoryPool* pool, bool failIfOOM = 
true) : pool_(pool), failIfOOM_(failIfOOM) {}
-
-  bool checkEvict(int64_t newCapacity, std::function<void()> block);
-
-  void setCapacity(int64_t capacity);
-
-  int64_t capacity() const;
-
-  void setEvictable(Reclaimable* evictable);
-
-  arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) 
override;
-
-  arrow::Status Reallocate(int64_t oldSize, int64_t newSize, int64_t 
alignment, uint8_t** ptr) override;
-
-  void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
-
-  int64_t bytes_allocated() const override;
-
-  int64_t max_memory() const override;
-
-  std::string backend_name() const override;
-
-  int64_t total_bytes_allocated() const override;
-
-  int64_t num_allocations() const override;
-
- private:
-  arrow::Status ensureCapacity(int64_t size);
-
-  arrow::MemoryPool* pool_;
-  bool failIfOOM_;
-
-  Reclaimable* evictable_;
-  int64_t capacity_{std::numeric_limits<int64_t>::max()};
-
-  int64_t bytesEvicted_{0};
-};
-
-} // namespace gluten
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h 
b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
deleted file mode 100644
index 4a69c9a7e1..0000000000
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ /dev/null
@@ -1,592 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <arrow/record_batch.h>
-#include <arrow/result.h>
-#include <arrow/util/compression.h>
-#include <gtest/gtest.h>
-#include "LocalRssClient.h"
-#include "memory/VeloxColumnarBatch.h"
-#include "shuffle/PartitionWriter.h"
-#include "shuffle/VeloxShuffleReader.h"
-#include "utils/Compression.h"
-#include "velox/type/Type.h"
-#include "velox/vector/tests/VectorTestUtils.h"
-
-namespace gluten {
-
-namespace {
-std::string makeString(uint32_t length) {
-  static const std::string kLargeStringOf128Bytes =
-      "thisisalaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
-      "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaargestringlengthmorethan16bytes";
-  std::string res{};
-  auto repeats = length / kLargeStringOf128Bytes.length();
-  while (repeats--) {
-    res.append(kLargeStringOf128Bytes);
-  }
-  if (auto remains = length % kLargeStringOf128Bytes.length()) {
-    res.append(kLargeStringOf128Bytes.substr(0, remains));
-  }
-  return res;
-}
-
-std::unique_ptr<PartitionWriter> createPartitionWriter(
-    PartitionWriterType partitionWriterType,
-    uint32_t numPartitions,
-    const std::string& dataFile,
-    const std::vector<std::string>& localDirs,
-    const PartitionWriterOptions& options,
-    arrow::MemoryPool* pool) {
-  if (partitionWriterType == PartitionWriterType::kRss) {
-    auto rssClient = std::make_unique<LocalRssClient>(dataFile);
-    return std::make_unique<RssPartitionWriter>(numPartitions, options, pool, 
std::move(rssClient));
-  }
-  return std::make_unique<LocalPartitionWriter>(numPartitions, options, pool, 
dataFile, localDirs);
-}
-} // namespace
-
-struct ShuffleTestParams {
-  ShuffleWriterType shuffleWriterType;
-  PartitionWriterType partitionWriterType;
-  arrow::Compression::type compressionType;
-  int32_t compressionThreshold{0};
-  int32_t mergeBufferSize{0};
-  int32_t diskWriteBufferSize{0};
-  bool useRadixSort{false};
-
-  std::string toString() const {
-    std::ostringstream out;
-    out << "shuffleWriterType = " << shuffleWriterType << ", 
partitionWriterType = " << partitionWriterType
-        << ", compressionType = " << compressionType << ", 
compressionThreshold = " << compressionThreshold
-        << ", mergeBufferSize = " << mergeBufferSize << ", 
compressionBufferSize = " << diskWriteBufferSize
-        << ", useRadixSort = " << (useRadixSort ? "true" : "false");
-    return out.str();
-  }
-};
-
-class VeloxShuffleWriterTestBase : public 
facebook::velox::test::VectorTestBase {
- public:
-  virtual arrow::Status initShuffleWriterOptions() {
-    RETURN_NOT_OK(setLocalDirsAndDataFile());
-    return arrow::Status::OK();
-  }
-
- protected:
-  void setUp() {
-    if 
(!isRegisteredNamedVectorSerde(facebook::velox::VectorSerde::Kind::kPresto)) {
-      // RSS shuffle serde.
-      
facebook::velox::serializer::presto::PrestoVectorSerde::registerNamedVectorSerde();
-    }
-    // Set up test data.
-    children1_ = {
-        makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt, 4, 
std::nullopt, 5, 6, std::nullopt, 7}),
-        makeNullableFlatVector<int8_t>({1, -1, std::nullopt, std::nullopt, -2, 
2, std::nullopt, std::nullopt, 3, -3}),
-        makeNullableFlatVector<int32_t>({1, 2, 3, 4, std::nullopt, 5, 6, 7, 8, 
std::nullopt}),
-        makeNullableFlatVector<int64_t>(
-            {std::nullopt,
-             std::nullopt,
-             std::nullopt,
-             std::nullopt,
-             std::nullopt,
-             std::nullopt,
-             std::nullopt,
-             std::nullopt,
-             std::nullopt,
-             std::nullopt}),
-        makeNullableFlatVector<float>(
-            {-0.1234567,
-             std::nullopt,
-             0.1234567,
-             std::nullopt,
-             -0.142857,
-             std::nullopt,
-             0.142857,
-             0.285714,
-             0.428617,
-             std::nullopt}),
-        makeNullableFlatVector<bool>(
-            {std::nullopt, true, false, std::nullopt, true, true, false, true, 
std::nullopt, std::nullopt}),
-        makeFlatVector<facebook::velox::StringView>(
-            {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6", 
"boB7", "ALICE8", "BOB9"}),
-        makeNullableFlatVector<facebook::velox::StringView>(
-            {"alice", "bob", std::nullopt, std::nullopt, "Alice", "Bob", 
std::nullopt, "alicE", std::nullopt, "boB"}),
-        facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 10, 
pool())};
-
-    children2_ = {
-        makeNullableFlatVector<int8_t>({std::nullopt, std::nullopt}),
-        makeFlatVector<int8_t>({1, -1}),
-        makeNullableFlatVector<int32_t>({100, std::nullopt}),
-        makeFlatVector<int64_t>({1, 1}),
-        makeFlatVector<float>({0.142857, -0.142857}),
-        makeFlatVector<bool>({true, false}),
-        makeFlatVector<facebook::velox::StringView>(
-            {"bob",
-             
"alicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealicealice"}),
-        makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, 
std::nullopt}),
-        facebook::velox::BaseVector::create(facebook::velox::UNKNOWN(), 2, 
pool())};
-
-    childrenNoNull_ = {
-        makeFlatVector<int8_t>({0, 1}),
-        makeFlatVector<int8_t>({0, -1}),
-        makeFlatVector<int32_t>({0, 100}),
-        makeFlatVector<int64_t>({0, 1}),
-        makeFlatVector<float>({0, 0.142857}),
-        makeFlatVector<bool>({false, true}),
-        makeFlatVector<facebook::velox::StringView>({"", "alice"}),
-        makeFlatVector<facebook::velox::StringView>({"alice", ""}),
-    };
-
-    largeString1_ = makeString(1024);
-    childrenLargeBinary1_ = {
-        makeFlatVector<int8_t>(std::vector<int8_t>(4096, 0)),
-        makeFlatVector<int8_t>(std::vector<int8_t>(4096, 0)),
-        makeFlatVector<int32_t>(std::vector<int32_t>(4096, 0)),
-        makeFlatVector<int64_t>(std::vector<int64_t>(4096, 0)),
-        makeFlatVector<float>(std::vector<float>(4096, 0)),
-        makeFlatVector<bool>(std::vector<bool>(4096, true)),
-        makeNullableFlatVector<facebook::velox::StringView>(
-            std::vector<std::optional<facebook::velox::StringView>>(4096, 
largeString1_.c_str())),
-        makeNullableFlatVector<facebook::velox::StringView>(
-            std::vector<std::optional<facebook::velox::StringView>>(4096, 
std::nullopt)),
-    };
-    largeString2_ = makeString(4096);
-    auto vectorToSpill = childrenLargeBinary2_ = {
-        makeFlatVector<int8_t>(std::vector<int8_t>(2048, 0)),
-        makeFlatVector<int8_t>(std::vector<int8_t>(2048, 0)),
-        makeFlatVector<int32_t>(std::vector<int32_t>(2048, 0)),
-        makeFlatVector<int64_t>(std::vector<int64_t>(2048, 0)),
-        makeFlatVector<float>(std::vector<float>(2048, 0)),
-        makeFlatVector<bool>(std::vector<bool>(2048, true)),
-        makeNullableFlatVector<facebook::velox::StringView>(
-            std::vector<std::optional<facebook::velox::StringView>>(2048, 
largeString2_.c_str())),
-        makeNullableFlatVector<facebook::velox::StringView>(
-            std::vector<std::optional<facebook::velox::StringView>>(2048, 
std::nullopt)),
-    };
-    childrenComplex_ = {
-        makeNullableFlatVector<int32_t>({std::nullopt, 1}),
-        makeRowVector({
-            makeFlatVector<int32_t>({1, 3}),
-            makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, 
"de"}),
-        }),
-        makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, "10 
I'm not inline string"}),
-        makeArrayVector<int64_t>({
-            {1, 2, 3, 4, 5},
-            {1, 2, 3},
-        }),
-        makeMapVector<int32_t, facebook::velox::StringView>(
-            {{{1, "str1000"}, {2, "str2000"}}, {{3, "str3000"}, {4, 
"str4000"}}}),
-    };
-
-    inputVector1_ = makeRowVector(children1_);
-    inputVector2_ = makeRowVector(children2_);
-    inputVectorNoNull_ = makeRowVector(childrenNoNull_);
-    inputVectorLargeBinary1_ = makeRowVector(childrenLargeBinary1_);
-    inputVectorLargeBinary2_ = makeRowVector(childrenLargeBinary2_);
-    inputVectorComplex_ = makeRowVector(childrenComplex_);
-  }
-
-  arrow::Status splitRowVector(
-      VeloxShuffleWriter& shuffleWriter,
-      facebook::velox::RowVectorPtr vector,
-      int64_t memLimit = ShuffleWriter::kMinMemLimit) {
-    std::shared_ptr<ColumnarBatch> cb = 
std::make_shared<VeloxColumnarBatch>(vector);
-    return shuffleWriter.write(cb, memLimit);
-  }
-
-  // Create multiple local dirs and join with comma.
-  arrow::Status setLocalDirsAndDataFile() {
-    static const std::string kTestLocalDirsPrefix = "columnar-shuffle-test-";
-
-    // Create first tmp dir and create data file.
-    // To prevent tmpDirs from being deleted in the dtor, we need to store 
them.
-    tmpDirs_.emplace_back();
-    ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(), 
arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix))
-    ARROW_ASSIGN_OR_RAISE(dataFile_, 
createTempShuffleFile(tmpDirs_.back()->path().ToString()));
-    localDirs_.push_back(tmpDirs_.back()->path().ToString());
-
-    // Create second tmp dir.
-    tmpDirs_.emplace_back();
-    ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(), 
arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix))
-    localDirs_.push_back(tmpDirs_.back()->path().ToString());
-    return arrow::Status::OK();
-  }
-
-  virtual std::shared_ptr<VeloxShuffleWriter> 
createShuffleWriter(arrow::MemoryPool* arrowPool) = 0;
-
-  ShuffleWriterOptions shuffleWriterOptions_{};
-  PartitionWriterOptions partitionWriterOptions_{};
-
-  std::vector<std::unique_ptr<arrow::internal::TemporaryDir>> tmpDirs_;
-  std::string dataFile_;
-  std::vector<std::string> localDirs_;
-
-  std::vector<facebook::velox::VectorPtr> children1_;
-  std::vector<facebook::velox::VectorPtr> children2_;
-  std::vector<facebook::velox::VectorPtr> childrenNoNull_;
-  std::vector<facebook::velox::VectorPtr> childrenLargeBinary1_;
-  std::vector<facebook::velox::VectorPtr> childrenLargeBinary2_;
-  std::vector<facebook::velox::VectorPtr> childrenComplex_;
-
-  facebook::velox::RowVectorPtr inputVector1_;
-  facebook::velox::RowVectorPtr inputVector2_;
-  facebook::velox::RowVectorPtr inputVectorNoNull_;
-  std::string largeString1_;
-  std::string largeString2_;
-  facebook::velox::RowVectorPtr inputVectorLargeBinary1_;
-  facebook::velox::RowVectorPtr inputVectorLargeBinary2_;
-  facebook::velox::RowVectorPtr inputVectorComplex_;
-};
-
-class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
- public:
-  arrow::Status initShuffleWriterOptions() override {
-    RETURN_NOT_OK(VeloxShuffleWriterTestBase::initShuffleWriterOptions());
-
-    ShuffleTestParams params = GetParam();
-    shuffleWriterOptions_.useRadixSort = params.useRadixSort;
-    shuffleWriterOptions_.diskWriteBufferSize = params.diskWriteBufferSize;
-    partitionWriterOptions_.compressionType = params.compressionType;
-    switch (partitionWriterOptions_.compressionType) {
-      case arrow::Compression::UNCOMPRESSED:
-        partitionWriterOptions_.compressionTypeStr = "none";
-        break;
-      case arrow::Compression::LZ4_FRAME:
-        partitionWriterOptions_.compressionTypeStr = "lz4";
-        break;
-      case arrow::Compression::ZSTD:
-        partitionWriterOptions_.compressionTypeStr = "zstd";
-        break;
-      default:
-        break;
-    };
-    partitionWriterOptions_.compressionThreshold = params.compressionThreshold;
-    partitionWriterOptions_.mergeBufferSize = params.mergeBufferSize;
-    return arrow::Status::OK();
-  }
-
-  std::shared_ptr<VeloxShuffleWriter> createSpecificShuffleWriter(
-      arrow::MemoryPool* arrowPool,
-      std::unique_ptr<PartitionWriter> partitionWriter,
-      ShuffleWriterOptions shuffleWriterOptions,
-      uint32_t numPartitions,
-      int32_t bufferSize) {
-    if (shuffleWriterOptions.shuffleWriterType == 
ShuffleWriterType::kHashShuffle) {
-      shuffleWriterOptions.bufferSize = bufferSize;
-    }
-    GLUTEN_ASSIGN_OR_THROW(
-        auto shuffleWriter,
-        VeloxShuffleWriter::create(
-            GetParam().shuffleWriterType,
-            numPartitions,
-            std::move(partitionWriter),
-            std::move(shuffleWriterOptions),
-            pool_,
-            arrowPool));
-    return shuffleWriter;
-  }
-
- protected:
-  static void SetUpTestCase() {
-    facebook::velox::memory::MemoryManager::testingSetInstance({});
-  }
-
-  virtual void SetUp() override {
-    std::cout << "Running test with param: " << GetParam().toString() << 
std::endl;
-    VeloxShuffleWriterTestBase::setUp();
-  }
-
-  void TearDown() override {
-    if (file_ != nullptr && !file_->closed()) {
-      GLUTEN_THROW_NOT_OK(file_->Close());
-    }
-  }
-
-  static void checkFileExists(const std::string& fileName) {
-    
ASSERT_EQ(*arrow::internal::FileExists(*arrow::internal::PlatformFilename::FromString(fileName)),
 true);
-  }
-
-  std::shared_ptr<arrow::Schema> getArrowSchema(facebook::velox::RowVectorPtr& 
rowVector) {
-    return toArrowSchema(rowVector->type(), pool());
-  }
-
-  void setReadableFile(const std::string& fileName) {
-    if (file_ != nullptr && !file_->closed()) {
-      GLUTEN_THROW_NOT_OK(file_->Close());
-    }
-    GLUTEN_ASSIGN_OR_THROW(file_, arrow::io::ReadableFile::Open(fileName))
-  }
-
-  void getRowVectors(
-      arrow::Compression::type compressionType,
-      std::shared_ptr<arrow::Schema> schema,
-      std::vector<facebook::velox::RowVectorPtr>& vectors,
-      std::shared_ptr<arrow::io::InputStream> in) {
-    ShuffleReaderOptions options;
-    options.compressionType = compressionType;
-    auto codec = createArrowIpcCodec(options.compressionType, 
CodecBackend::NONE);
-    auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema));
-    switch (options.compressionType) {
-      case arrow::Compression::type::UNCOMPRESSED:
-        options.compressionTypeStr = "none";
-        break;
-      case arrow::Compression::type::LZ4_FRAME:
-        options.compressionTypeStr = "lz4";
-        break;
-      case arrow::Compression::type::ZSTD:
-        options.compressionTypeStr = "zstd";
-        break;
-      default:
-        break;
-    };
-    auto veloxCompressionType = 
facebook::velox::common::stringToCompressionKind(options.compressionTypeStr);
-    if (!facebook::velox::isRegisteredVectorSerde()) {
-      
facebook::velox::serializer::presto::PrestoVectorSerde::registerVectorSerde();
-    }
-    // Set batchSize to a large value to make all batches are merged by reader.
-    auto deserializerFactory = 
std::make_unique<gluten::VeloxShuffleReaderDeserializerFactory>(
-        schema,
-        std::move(codec),
-        veloxCompressionType,
-        rowType,
-        kDefaultBatchSize,
-        kDefaultReadBufferSize,
-        kDefaultDeserializerBufferSize,
-        defaultArrowMemoryPool().get(),
-        pool_,
-        GetParam().shuffleWriterType);
-    auto reader = 
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
-    auto iter = reader->readStream(in);
-    while (iter->hasNext()) {
-      auto vector = 
std::dynamic_pointer_cast<VeloxColumnarBatch>(iter->next())->getRowVector();
-      vectors.emplace_back(vector);
-    }
-  }
-
-  std::shared_ptr<arrow::io::ReadableFile> file_;
-};
-
-class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
- protected:
-  void testShuffleWrite(VeloxShuffleWriter& shuffleWriter, 
std::vector<facebook::velox::RowVectorPtr> vectors) {
-    for (auto& vector : vectors) {
-      ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
-    }
-    ASSERT_NOT_OK(shuffleWriter.stop());
-    // verify data file
-    checkFileExists(dataFile_);
-    // verify output temporary files
-    const auto& lengths = shuffleWriter.partitionLengths();
-    ASSERT_EQ(lengths.size(), 1);
-
-    auto schema = getArrowSchema(vectors[0]);
-    std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
-    setReadableFile(dataFile_);
-    GLUTEN_ASSIGN_OR_THROW(auto in, 
arrow::io::RandomAccessFile::GetStream(file_, 0, lengths[0]));
-    getRowVectors(partitionWriterOptions_.compressionType, schema, 
deserializedVectors, in);
-
-    ASSERT_EQ(deserializedVectors.size(), vectors.size());
-    for (int32_t i = 0; i < deserializedVectors.size(); i++) {
-      facebook::velox::test::assertEqualVectors(vectors[i], 
deserializedVectors[i]);
-    }
-  }
-
-  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* 
arrowPool) override {
-    shuffleWriterOptions_.partitioning = Partitioning::kSingle;
-    static const uint32_t kNumPartitions = 1;
-    auto partitionWriter = createPartitionWriter(
-        GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-    std::shared_ptr<VeloxShuffleWriter> shuffleWriter = 
createSpecificShuffleWriter(
-        arrowPool, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), kNumPartitions, 10);
-    return shuffleWriter;
-  }
-};
-
-class MultiplePartitioningShuffleWriter : public VeloxShuffleWriterTest {
- protected:
-  void shuffleWriteReadMultiBlocks(
-      VeloxShuffleWriter& shuffleWriter,
-      int32_t expectPartitionLength,
-      facebook::velox::TypePtr dataType,
-      std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{ /* blockId = pid, rowVector in block */
-    ASSERT_NOT_OK(shuffleWriter.stop());
-    // verify data file
-    checkFileExists(dataFile_);
-    // verify output temporary files
-    const auto& lengths = shuffleWriter.partitionLengths();
-    ASSERT_EQ(lengths.size(), expectPartitionLength);
-    int64_t lengthSum = std::accumulate(lengths.begin(), lengths.end(), 0);
-    auto schema = toArrowSchema(dataType, pool());
-    setReadableFile(dataFile_);
-    ASSERT_EQ(*file_->GetSize(), lengthSum);
-    for (int32_t i = 0; i < expectPartitionLength; i++) {
-      if (expectedVectors[i].size() == 0) {
-        ASSERT_EQ(lengths[i], 0);
-      } else {
-        std::vector<facebook::velox::RowVectorPtr> deserializedVectors;
-        GLUTEN_ASSIGN_OR_THROW(
-            auto in, arrow::io::RandomAccessFile::GetStream(file_, i == 0 ? 0 
: lengths[i - 1], lengths[i]));
-        getRowVectors(partitionWriterOptions_.compressionType, schema, 
deserializedVectors, in);
-        ASSERT_EQ(expectedVectors[i].size(), deserializedVectors.size());
-        for (int32_t j = 0; j < expectedVectors[i].size(); j++) {
-          facebook::velox::test::assertEqualVectors(expectedVectors[i][j], 
deserializedVectors[j]);
-        }
-      }
-    }
-  }
-
-  void testShuffleWriteMultiBlocks(
-      VeloxShuffleWriter& shuffleWriter,
-      std::vector<facebook::velox::RowVectorPtr> vectors,
-      int32_t expectPartitionLength,
-      facebook::velox::TypePtr dataType,
-      std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{
-    for (auto& vector : vectors) {
-      ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
-    }
-    shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength, 
dataType, expectedVectors);
-  }
-};
-
-class HashPartitioningShuffleWriter : public MultiplePartitioningShuffleWriter 
{
- protected:
-  void SetUp() override {
-    MultiplePartitioningShuffleWriter::SetUp();
-
-    children1_.insert((children1_.begin()), makeFlatVector<int32_t>({1, 2, 2, 
2, 2, 1, 1, 1, 2, 1}));
-    hashInputVector1_ = makeRowVector(children1_);
-    children2_.insert((children2_.begin()), makeFlatVector<int32_t>({2, 2}));
-    hashInputVector2_ = makeRowVector(children2_);
-  }
-
-  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* 
arrowPool) override {
-    shuffleWriterOptions_.partitioning = Partitioning::kHash;
-    static const uint32_t kNumPartitions = 2;
-    auto partitionWriter = createPartitionWriter(
-        GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-    std::shared_ptr<VeloxShuffleWriter> shuffleWriter = 
createSpecificShuffleWriter(
-        arrowPool, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), kNumPartitions, 4);
-    return shuffleWriter;
-  }
-
-  std::vector<uint32_t> hashPartitionIds_{1, 2};
-
-  facebook::velox::RowVectorPtr hashInputVector1_;
-  facebook::velox::RowVectorPtr hashInputVector2_;
-};
-
-class RangePartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
- protected:
-  void SetUp() override {
-    MultiplePartitioningShuffleWriter::SetUp();
-
-    auto pid1 = makeRowVector({makeFlatVector<int32_t>({0, 1, 0, 1, 0, 1, 0, 
1, 0, 1})});
-    auto rangeVector1 = makeRowVector(inputVector1_->children());
-    compositeBatch1_ = VeloxColumnarBatch::compose(
-        pool(), {std::make_shared<VeloxColumnarBatch>(pid1), 
std::make_shared<VeloxColumnarBatch>(rangeVector1)});
-
-    auto pid2 = makeRowVector({makeFlatVector<int32_t>({0, 1})});
-    auto rangeVector2 = makeRowVector(inputVector2_->children());
-    compositeBatch2_ = VeloxColumnarBatch::compose(
-        pool(), {std::make_shared<VeloxColumnarBatch>(pid2), 
std::make_shared<VeloxColumnarBatch>(rangeVector2)});
-  }
-
-  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* 
arrowPool) override {
-    shuffleWriterOptions_.partitioning = Partitioning::kRange;
-    static const uint32_t kNumPartitions = 2;
-    auto partitionWriter = createPartitionWriter(
-        GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-    std::shared_ptr<VeloxShuffleWriter> shuffleWriter = 
createSpecificShuffleWriter(
-        arrowPool, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), kNumPartitions, 4);
-    return shuffleWriter;
-  }
-
-  void testShuffleWriteMultiBlocks(
-      VeloxShuffleWriter& shuffleWriter,
-      std::vector<std::shared_ptr<ColumnarBatch>> batches,
-      int32_t expectPartitionLength,
-      facebook::velox::TypePtr dataType,
-      std::vector<std::vector<facebook::velox::RowVectorPtr>> expectedVectors) 
{ /* blockId = pid, rowVector in block */
-    for (auto& batch : batches) {
-      ASSERT_NOT_OK(shuffleWriter.write(batch, ShuffleWriter::kMinMemLimit));
-    }
-    shuffleWriteReadMultiBlocks(shuffleWriter, expectPartitionLength, 
dataType, expectedVectors);
-  }
-
-  std::shared_ptr<ColumnarBatch> compositeBatch1_;
-  std::shared_ptr<ColumnarBatch> compositeBatch2_;
-};
-
-class RoundRobinPartitioningShuffleWriter : public 
MultiplePartitioningShuffleWriter {
- protected:
-  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* 
arrowPool) override {
-    static const uint32_t kNumPartitions = 2;
-    auto partitionWriter = createPartitionWriter(
-        GetParam().partitionWriterType, kNumPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-    std::shared_ptr<VeloxShuffleWriter> shuffleWriter = 
createSpecificShuffleWriter(
-        arrowPool, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), kNumPartitions, 4);
-    return shuffleWriter;
-  }
-};
-
-class VeloxHashShuffleWriterMemoryTest : public VeloxShuffleWriterTestBase, 
public testing::Test {
- protected:
-  static void SetUpTestCase() {
-    facebook::velox::memory::MemoryManager::testingSetInstance({});
-  }
-  void SetUp() override {
-    VeloxShuffleWriterTestBase::setUp();
-  }
-
-  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions, arrow::MemoryPool* arrowPool) {
-    auto partitionWriter = createPartitionWriter(
-        PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_, arrowPool);
-    GLUTEN_ASSIGN_OR_THROW(
-        auto shuffleWriter,
-        VeloxHashShuffleWriter::create(
-            numPartitions, std::move(partitionWriter), 
std::move(shuffleWriterOptions_), pool_, arrowPool));
-    return shuffleWriter;
-  }
-
-  std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(arrow::MemoryPool* 
arrowPool) override {
-    return createShuffleWriter(kDefaultShufflePartitions, arrowPool);
-  }
-
-  int64_t splitRowVectorAndSpill(
-      VeloxShuffleWriter& shuffleWriter,
-      std::vector<facebook::velox::RowVectorPtr> vectors,
-      bool shrink) {
-    for (auto vector : vectors) {
-      ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
-    }
-
-    auto targetEvicted = shuffleWriter.cachedPayloadSize();
-    if (shrink) {
-      targetEvicted += shuffleWriter.partitionBufferSize();
-    }
-    int64_t evicted;
-    ASSERT_NOT_OK(shuffleWriter.reclaimFixedSize(targetEvicted, &evicted));
-
-    return evicted;
-  };
-
-  static constexpr uint32_t kDefaultShufflePartitions = 2;
-};
-
-} // namespace gluten


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to