This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b8087c85da [GLUTEN-9836][VL] Remove ShuffleMemoryPool and support 
creating arrow pool instances (#9869)
b8087c85da is described below

commit b8087c85da7cabe57a25a09aea66c83dc83e89ed
Author: Rong Ma <[email protected]>
AuthorDate: Sun Jun 8 15:25:19 2025 +0100

    [GLUTEN-9836][VL] Remove ShuffleMemoryPool and support creating arrow pool 
instances (#9869)
---
 cpp/core/CMakeLists.txt                            |  1 -
 cpp/core/jni/JniWrapper.cc                         |  6 +-
 cpp/core/memory/ArrowMemoryPool.cc                 | 17 +++--
 cpp/core/memory/ArrowMemoryPool.h                  | 17 +++--
 cpp/core/memory/MemoryManager.h                    |  8 ++-
 cpp/core/shuffle/PartitionWriter.h                 |  5 +-
 cpp/core/shuffle/ShuffleMemoryPool.cc              | 72 ----------------------
 cpp/core/shuffle/ShuffleMemoryPool.h               | 48 ---------------
 cpp/core/shuffle/ShuffleWriter.cc                  |  8 +--
 cpp/core/shuffle/ShuffleWriter.h                   |  6 +-
 cpp/velox/compute/VeloxRuntime.cc                  |  9 +--
 cpp/velox/memory/VeloxMemoryManager.cc             | 62 ++++++++++++++++---
 cpp/velox/memory/VeloxMemoryManager.h              | 19 ++++--
 cpp/velox/shuffle/VeloxHashShuffleWriter.cc        |  5 +-
 cpp/velox/shuffle/VeloxHashShuffleWriter.h         |  8 +--
 cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc     |  7 +--
 cpp/velox/shuffle/VeloxRssSortShuffleWriter.h      |  8 +--
 cpp/velox/shuffle/VeloxShuffleWriter.cc            |  9 ++-
 cpp/velox/shuffle/VeloxShuffleWriter.h             | 14 ++---
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc        | 12 ++--
 cpp/velox/shuffle/VeloxSortShuffleWriter.h         |  6 +-
 cpp/velox/tests/RuntimeTest.cc                     |  5 +-
 .../tests/VeloxColumnarBatchSerializerTest.cc      | 15 +++--
 cpp/velox/tests/VeloxRowToColumnarTest.cc          |  4 +-
 cpp/velox/tests/VeloxShuffleWriterSpillTest.cc     |  7 +--
 cpp/velox/tests/VeloxShuffleWriterTest.cc          | 29 +++------
 26 files changed, 168 insertions(+), 239 deletions(-)

diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt
index 9caacf277c..b164f2a953 100644
--- a/cpp/core/CMakeLists.txt
+++ b/cpp/core/CMakeLists.txt
@@ -137,7 +137,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
     shuffle/rss/RssPartitionWriter.cc
     shuffle/RandomPartitioner.cc
     shuffle/RoundRobinPartitioner.cc
-    shuffle/ShuffleMemoryPool.cc
     shuffle/ShuffleWriter.cc
     shuffle/SinglePartitioner.cc
     shuffle/Spill.cc
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 1321b6041f..2fde8ca4cd 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -144,10 +144,14 @@ class InternalMemoryManager : public MemoryManager {
  public:
   InternalMemoryManager(const std::string& kind) : MemoryManager(kind) {}
 
-  arrow::MemoryPool* getArrowMemoryPool() override {
+  arrow::MemoryPool* defaultArrowMemoryPool() override {
     throw GlutenException("Not implemented");
   }
 
+  std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string& 
name) override {
+    throw GlutenException("Not yet implemented");
+  }
+
   const MemoryUsageStats collectMemoryUsageStats() const override {
     return MemoryUsageStats();
   }
diff --git a/cpp/core/memory/ArrowMemoryPool.cc 
b/cpp/core/memory/ArrowMemoryPool.cc
index e1ecae142b..83a511cf8f 100644
--- a/cpp/core/memory/ArrowMemoryPool.cc
+++ b/cpp/core/memory/ArrowMemoryPool.cc
@@ -16,14 +16,14 @@
  */
 
 #include "ArrowMemoryPool.h"
-#include "arrow/type_fwd.h"
 #include "utils/Exception.h"
 
 namespace gluten {
 
-std::shared_ptr<arrow::MemoryPool> defaultArrowMemoryPool() {
-  static auto staticPool = 
std::make_shared<ArrowMemoryPool>(defaultMemoryAllocator().get());
-  return staticPool;
+ArrowMemoryPool::~ArrowMemoryPool() {
+  if (releaser_ != nullptr) {
+    releaser_(this);
+  }
 }
 
 arrow::Status ArrowMemoryPool::Allocate(int64_t size, int64_t alignment, 
uint8_t** out) {
@@ -45,10 +45,13 @@ void ArrowMemoryPool::Free(uint8_t* buffer, int64_t size, 
int64_t alignment) {
 }
 
 int64_t ArrowMemoryPool::bytes_allocated() const {
-  // fixme use self accountant
   return allocator_->getBytes();
 }
 
+int64_t ArrowMemoryPool::max_memory() const {
+  return allocator_->peakBytes();
+}
+
 int64_t ArrowMemoryPool::total_bytes_allocated() const {
   throw GlutenException("Not implement");
 }
@@ -61,4 +64,8 @@ std::string ArrowMemoryPool::backend_name() const {
   return "gluten arrow allocator";
 }
 
+MemoryAllocator* ArrowMemoryPool::allocator() const {
+  return allocator_.get();
+}
+
 } // namespace gluten
diff --git a/cpp/core/memory/ArrowMemoryPool.h 
b/cpp/core/memory/ArrowMemoryPool.h
index 85152d119c..9f3592ceec 100644
--- a/cpp/core/memory/ArrowMemoryPool.h
+++ b/cpp/core/memory/ArrowMemoryPool.h
@@ -23,12 +23,16 @@
 
 namespace gluten {
 
-/// This pool was not tracked by Spark, should only used in test.
-std::shared_ptr<arrow::MemoryPool> defaultArrowMemoryPool();
+using ArrowMemoryPoolReleaser = std::function<void(arrow::MemoryPool*)>;
 
+/// This pool was not tracked by Spark, should only used in test.
 class ArrowMemoryPool final : public arrow::MemoryPool {
  public:
-  explicit ArrowMemoryPool(MemoryAllocator* allocator) : allocator_(allocator) 
{}
+  explicit ArrowMemoryPool(AllocationListener* listener, 
ArrowMemoryPoolReleaser releaser = nullptr)
+      : 
allocator_(std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
 listener)),
+        releaser_(std::move(releaser)) {}
+
+  ~ArrowMemoryPool() override;
 
   arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) 
override;
 
@@ -38,14 +42,19 @@ class ArrowMemoryPool final : public arrow::MemoryPool {
 
   int64_t bytes_allocated() const override;
 
+  int64_t max_memory() const override;
+
   int64_t total_bytes_allocated() const override;
 
   int64_t num_allocations() const override;
 
   std::string backend_name() const override;
 
+  MemoryAllocator* allocator() const;
+
  private:
-  MemoryAllocator* allocator_;
+  std::unique_ptr<MemoryAllocator> allocator_;
+  ArrowMemoryPoolReleaser releaser_;
 };
 
 } // namespace gluten
diff --git a/cpp/core/memory/MemoryManager.h b/cpp/core/memory/MemoryManager.h
index 5473eb74e9..cb7512d23c 100644
--- a/cpp/core/memory/MemoryManager.h
+++ b/cpp/core/memory/MemoryManager.h
@@ -39,7 +39,13 @@ class MemoryManager {
     return kind_;
   }
 
-  virtual arrow::MemoryPool* getArrowMemoryPool() = 0;
+  // Get the default Arrow memory pool for this memory manager. This memory 
pool is held by the memory manager.
+  virtual arrow::MemoryPool* defaultArrowMemoryPool() = 0;
+
+  // Create a new Arrow memory pool with the given name. The caller is 
responsible for managing the lifetime of the
+  // returned memory pool. Memory manager only holds the weak reference to the 
memory pool for collecting memory usage.
+  // If the name is already used by an existing memory pool, the creation will 
fail.
+  virtual std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const 
std::string& name) = 0;
 
   virtual const MemoryUsageStats collectMemoryUsageStats() const = 0;
 
diff --git a/cpp/core/shuffle/PartitionWriter.h 
b/cpp/core/shuffle/PartitionWriter.h
index ece16c95a9..73e05ed287 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include "ShuffleMemoryPool.h"
 #include "memory/MemoryManager.h"
 #include "memory/Reclaimable.h"
 #include "shuffle/Options.h"
@@ -34,7 +33,7 @@ class PartitionWriter : public Reclaimable {
  public:
   PartitionWriter(uint32_t numPartitions, PartitionWriterOptions options, 
MemoryManager* memoryManager)
       : numPartitions_(numPartitions), options_(std::move(options)), 
memoryManager_(memoryManager) {
-    payloadPool_ = 
std::make_unique<ShuffleMemoryPool>(memoryManager->getArrowMemoryPool());
+    payloadPool_ = 
memoryManager->createArrowMemoryPool("PartitionWriter.cached_payload");
     codec_ = createArrowIpcCodec(options_.compressionType, 
options_.codecBackend, options_.compressionLevel);
   }
 
@@ -79,7 +78,7 @@ class PartitionWriter : public Reclaimable {
 
   // Memory Pool used to track memory allocation of partition payloads.
   // The actual allocation is delegated to options_.memoryPool.
-  std::unique_ptr<ShuffleMemoryPool> payloadPool_;
+  std::shared_ptr<arrow::MemoryPool> payloadPool_;
 
   std::unique_ptr<arrow::util::Codec> codec_;
 
diff --git a/cpp/core/shuffle/ShuffleMemoryPool.cc 
b/cpp/core/shuffle/ShuffleMemoryPool.cc
deleted file mode 100644
index a1fa26bce9..0000000000
--- a/cpp/core/shuffle/ShuffleMemoryPool.cc
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "shuffle/ShuffleMemoryPool.h"
-
-namespace gluten {
-gluten::ShuffleMemoryPool::ShuffleMemoryPool(arrow::MemoryPool* pool) : 
pool_(pool) {}
-
-arrow::Status ShuffleMemoryPool::Allocate(int64_t size, int64_t alignment, 
uint8_t** out) {
-  auto before = pool_->bytes_allocated();
-  auto status = pool_->Allocate(size, alignment, out);
-  if (status.ok()) {
-    bytesAllocated_ += (pool_->bytes_allocated() - before);
-    if (bytesAllocated_ > peakBytesAllocated_) {
-      peakBytesAllocated_ = bytesAllocated_;
-    }
-  }
-  return status;
-}
-
-arrow::Status ShuffleMemoryPool::Reallocate(int64_t old_size, int64_t 
new_size, int64_t alignment, uint8_t** ptr) {
-  auto before = pool_->bytes_allocated();
-  auto status = pool_->Reallocate(old_size, new_size, alignment, ptr);
-  if (status.ok()) {
-    bytesAllocated_ += (pool_->bytes_allocated() - before);
-    if (bytesAllocated_ > peakBytesAllocated_) {
-      peakBytesAllocated_ = bytesAllocated_;
-    }
-  }
-  return status;
-}
-
-void ShuffleMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) 
{
-  auto before = pool_->bytes_allocated();
-  pool_->Free(buffer, size, alignment);
-  bytesAllocated_ += (pool_->bytes_allocated() - before);
-}
-
-int64_t ShuffleMemoryPool::bytes_allocated() const {
-  return bytesAllocated_;
-}
-
-int64_t ShuffleMemoryPool::max_memory() const {
-  return peakBytesAllocated_;
-}
-
-std::string ShuffleMemoryPool::backend_name() const {
-  return pool_->backend_name();
-}
-
-int64_t ShuffleMemoryPool::total_bytes_allocated() const {
-  return pool_->total_bytes_allocated();
-}
-
-int64_t ShuffleMemoryPool::num_allocations() const {
-  throw pool_->num_allocations();
-}
-} // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleMemoryPool.h 
b/cpp/core/shuffle/ShuffleMemoryPool.h
deleted file mode 100644
index e7101295ce..0000000000
--- a/cpp/core/shuffle/ShuffleMemoryPool.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <arrow/memory_pool.h>
-
-#pragma once
-
-namespace gluten {
-class ShuffleMemoryPool : public arrow::MemoryPool {
- public:
-  ShuffleMemoryPool(arrow::MemoryPool* pool);
-
-  arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out) 
override;
-
-  arrow::Status Reallocate(int64_t old_size, int64_t new_size, int64_t 
alignment, uint8_t** ptr) override;
-
-  void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
-
-  int64_t bytes_allocated() const override;
-
-  int64_t max_memory() const override;
-
-  std::string backend_name() const override;
-
-  int64_t total_bytes_allocated() const override;
-
-  int64_t num_allocations() const override;
-
- private:
-  arrow::MemoryPool* pool_;
-  uint64_t bytesAllocated_ = 0;
-  uint64_t peakBytesAllocated_ = 0;
-};
-} // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleWriter.cc 
b/cpp/core/shuffle/ShuffleWriter.cc
index 13d7c30fec..192de798d8 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -83,10 +83,6 @@ int64_t ShuffleWriter::totalCompressTime() const {
   return metrics_.totalCompressTime;
 }
 
-int64_t ShuffleWriter::peakBytesAllocated() const {
-  return pool_->max_memory();
-}
-
 int64_t ShuffleWriter::totalSortTime() const {
   return 0;
 }
@@ -103,6 +99,6 @@ const std::vector<int64_t>& 
ShuffleWriter::rawPartitionLengths() const {
   return metrics_.rawPartitionLengths;
 }
 
-ShuffleWriter::ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions 
options, arrow::MemoryPool* pool)
-    : numPartitions_(numPartitions), options_(std::move(options)), pool_(pool) 
{}
+ShuffleWriter::ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions 
options)
+    : numPartitions_(numPartitions), options_(std::move(options)) {}
 } // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 8852f0527b..fea4708f97 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -55,7 +55,7 @@ class ShuffleWriter : public Reclaimable {
 
   int64_t totalCompressTime() const;
 
-  virtual int64_t peakBytesAllocated() const;
+  virtual int64_t peakBytesAllocated() const = 0;
 
   virtual int64_t totalSortTime() const;
 
@@ -66,7 +66,7 @@ class ShuffleWriter : public Reclaimable {
   const std::vector<int64_t>& rawPartitionLengths() const;
 
  protected:
-  ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions options, 
arrow::MemoryPool* pool);
+  ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions options);
 
   ~ShuffleWriter() override = default;
 
@@ -74,8 +74,6 @@ class ShuffleWriter : public Reclaimable {
 
   ShuffleWriterOptions options_;
 
-  arrow::MemoryPool* pool_;
-
   ShuffleWriterMetrics metrics_{};
 };
 
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 5ab32d5aa9..f9f3e127e5 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -217,8 +217,6 @@ std::shared_ptr<ShuffleWriter> 
VeloxRuntime::createShuffleWriter(
     int numPartitions,
     std::unique_ptr<PartitionWriter> partitionWriter,
     ShuffleWriterOptions options) {
-  auto veloxPool = memoryManager()->getLeafMemoryPool();
-  auto arrowPool = memoryManager()->getArrowMemoryPool();
   GLUTEN_ASSIGN_OR_THROW(
       std::shared_ptr<ShuffleWriter> shuffleWriter,
       VeloxShuffleWriter::create(
@@ -226,8 +224,7 @@ std::shared_ptr<ShuffleWriter> 
VeloxRuntime::createShuffleWriter(
           numPartitions,
           std::move(partitionWriter),
           std::move(options),
-          veloxPool,
-          arrowPool));
+          memoryManager()));
   return shuffleWriter;
 }
 
@@ -287,7 +284,7 @@ std::shared_ptr<ShuffleReader> 
VeloxRuntime::createShuffleReader(
       options.batchSize,
       options.readerBufferSize,
       options.deserializerBufferSize,
-      memoryManager()->getArrowMemoryPool(),
+      memoryManager()->defaultArrowMemoryPool(),
       memoryManager()->getLeafMemoryPool(),
       options.shuffleWriterType);
 
@@ -295,7 +292,7 @@ std::shared_ptr<ShuffleReader> 
VeloxRuntime::createShuffleReader(
 }
 
 std::unique_ptr<ColumnarBatchSerializer> 
VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
-  auto arrowPool = memoryManager()->getArrowMemoryPool();
+  auto arrowPool = memoryManager()->defaultArrowMemoryPool();
   auto veloxPool = memoryManager()->getLeafMemoryPool();
   return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, 
cSchema);
 }
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 563bb881d3..4dc09d415e 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -225,8 +225,9 @@ VeloxMemoryManager::VeloxMemoryManager(
   auto reservationBlockSize =
       backendConf.get<uint64_t>(kMemoryReservationBlockSize, 
kMemoryReservationBlockSizeDefault);
   blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), 
reservationBlockSize);
-  listenableAlloc_ = 
std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), 
blockListener_.get());
-  arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
+  defaultArrowPool_ = std::make_shared<ArrowMemoryPool>(blockListener_.get());
+  arrowPools_.emplace("default", defaultArrowPool_);
+
   auto checkUsageLeak = backendConf.get<bool>(kCheckUsageLeak, 
kCheckUsageLeakDefault);
 
   ArbitratorFactoryRegister afr(listener_.get());
@@ -261,10 +262,32 @@ MemoryUsageStats collectVeloxMemoryUsageStats(const 
velox::memory::MemoryPool* p
   return stats;
 }
 
-MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(const MemoryAllocator* 
allocator) {
+MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(
+    const std::unordered_map<std::string, std::weak_ptr<ArrowMemoryPool>>& 
arrowPools) {
   MemoryUsageStats stats;
-  stats.set_current(allocator->getBytes());
-  stats.set_peak(allocator->peakBytes());
+  int64_t totalBytes = 0;
+  int64_t peakBytes = 0;
+
+  for (const auto& [name, ptr] : arrowPools) {
+    auto pool = ptr.lock();
+    if (pool == nullptr) {
+      continue;
+    }
+
+    MemoryUsageStats poolStats;
+    const auto allocated = pool->bytes_allocated();
+    const auto peak = pool->max_memory();
+    poolStats.set_current(allocated);
+    poolStats.set_peak(peak);
+
+    stats.mutable_children()->emplace(name, poolStats);
+
+    totalBytes += allocated;
+    peakBytes = std::max(peakBytes, peak);
+  }
+
+  stats.set_current(totalBytes);
+  stats.set_peak(peakBytes);
   return stats;
 }
 
@@ -281,12 +304,26 @@ int64_t 
shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::M
 }
 } // namespace
 
+std::shared_ptr<arrow::MemoryPool> 
VeloxMemoryManager::createArrowMemoryPool(const std::string& name) {
+  std::lock_guard<std::mutex> l(mutex_);
+  VELOX_CHECK_EQ(arrowPools_.count(name), 0, "Arrow memory pool {} already 
exists", name);
+  auto pool = std::make_shared<ArrowMemoryPool>(
+      blockListener_.get(), [this, name](arrow::MemoryPool* pool) { 
this->dropMemoryPool(name); });
+  arrowPools_.emplace(name, pool);
+  return pool;
+}
+
+void VeloxMemoryManager::dropMemoryPool(const std::string& name) {
+  std::lock_guard<std::mutex> l(mutex_);
+  const auto ret = arrowPools_.erase(name);
+  VELOX_CHECK_EQ(ret, 1, "Child memory pool {} doesn't exist", name);
+}
+
 const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const {
   MemoryUsageStats stats;
   stats.set_current(listener_->currentBytes());
   stats.set_peak(listener_->peakBytes());
-  stats.mutable_children()->emplace(
-      "gluten::MemoryAllocator", 
collectGlutenAllocatorMemoryUsageStats(listenableAlloc_.get()));
+  stats.mutable_children()->emplace("gluten::MemoryAllocator", 
collectGlutenAllocatorMemoryUsageStats(arrowPools_));
   stats.mutable_children()->emplace(
       veloxAggregatePool_->name(), 
collectVeloxMemoryUsageStats(veloxAggregatePool_.get()));
   return stats;
@@ -367,10 +404,17 @@ bool VeloxMemoryManager::tryDestructSafe() {
   veloxMemoryManager_.reset();
 
   // Applies similar rule for Arrow memory pool.
-  if (arrowPool_ && arrowPool_->bytes_allocated() != 0) {
+  if (!arrowPools_.empty() && std::any_of(arrowPools_.begin(), 
arrowPools_.end(), [&](const auto& entry) {
+        auto pool = entry.second.lock();
+        if (pool == nullptr) {
+          return false;
+        }
+        return pool->bytes_allocated() != 0;
+      })) {
+    VLOG(2) << "Attempt to destruct VeloxMemoryManager failed because there 
are still outstanding Arrow memory pools.";
     return false;
   }
-  arrowPool_.reset();
+  arrowPools_.clear();
 
   // Successfully destructed.
   return true;
diff --git a/cpp/velox/memory/VeloxMemoryManager.h 
b/cpp/velox/memory/VeloxMemoryManager.h
index c2d79a9131..28c0f79614 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "memory/AllocationListener.h"
+#include "memory/ArrowMemoryPool.h"
 #include "memory/MemoryAllocator.h"
 #include "memory/MemoryManager.h"
 #include "velox/common/memory/Memory.h"
@@ -78,10 +79,12 @@ class VeloxMemoryManager final : public MemoryManager {
     return veloxMemoryManager_.get();
   }
 
-  arrow::MemoryPool* getArrowMemoryPool() override {
-    return arrowPool_.get();
+  arrow::MemoryPool* defaultArrowMemoryPool() override {
+    return defaultArrowPool_.get();
   }
 
+  std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string& 
name) override;
+
   const MemoryUsageStats collectMemoryUsageStats() const override;
 
   const int64_t shrink(int64_t size) override;
@@ -90,7 +93,7 @@ class VeloxMemoryManager final : public MemoryManager {
 
   /// Test only
   MemoryAllocator* allocator() const {
-    return listenableAlloc_.get();
+    return defaultArrowPool_->allocator();
   }
 
   AllocationListener* getListener() const {
@@ -100,20 +103,24 @@ class VeloxMemoryManager final : public MemoryManager {
  private:
   bool tryDestructSafe();
 
+  void dropMemoryPool(const std::string& name);
+
 #ifdef GLUTEN_ENABLE_HBM
   std::unique_ptr<VeloxMemoryAllocator> wrappedAlloc_;
 #endif
 
-  // This is a listenable allocator used for arrow.
-  std::unique_ptr<MemoryAllocator> listenableAlloc_;
   std::unique_ptr<AllocationListener> listener_;
   std::unique_ptr<AllocationListener> blockListener_;
-  std::unique_ptr<arrow::MemoryPool> arrowPool_;
+
+  std::shared_ptr<ArrowMemoryPool> defaultArrowPool_;
+  std::unordered_map<std::string, std::weak_ptr<ArrowMemoryPool>> arrowPools_;
 
   std::unique_ptr<facebook::velox::memory::MemoryManager> veloxMemoryManager_;
   std::shared_ptr<facebook::velox::memory::MemoryPool> veloxAggregatePool_;
   std::shared_ptr<facebook::velox::memory::MemoryPool> veloxLeafPool_;
   std::vector<std::shared_ptr<facebook::velox::memory::MemoryPool>> 
heldVeloxPools_;
+
+  std::mutex mutex_;
 };
 
 VeloxMemoryManager* getDefaultMemoryManager();
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index cdc0a7653c..2098332484 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -157,10 +157,9 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>> 
VeloxHashShuffleWriter::creat
     uint32_t numPartitions,
     std::unique_ptr<PartitionWriter> partitionWriter,
     ShuffleWriterOptions options,
-    std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-    arrow::MemoryPool* arrowPool) {
+    MemoryManager* memoryManager) {
   std::shared_ptr<VeloxHashShuffleWriter> res(
-      new VeloxHashShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), veloxPool, arrowPool));
+      new VeloxHashShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), memoryManager));
   RETURN_NOT_OK(res->init());
   return res;
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
index 5f45c9065c..e8f37d89f6 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
@@ -126,8 +126,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
       ShuffleWriterOptions options,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      arrow::MemoryPool* arrowPool);
+      MemoryManager* memoryManager);
 
   arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) 
override;
 
@@ -194,9 +193,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
       ShuffleWriterOptions options,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      arrow::MemoryPool* pool)
-      : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), std::move(veloxPool), pool) {}
+      MemoryManager* memoryManager)
+      : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), memoryManager) {}
 
   arrow::Status init();
 
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index 66d02aa68e..bb495ddf35 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -33,10 +33,9 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>> 
VeloxRssSortShuffleWriter::cr
     uint32_t numPartitions,
     std::unique_ptr<PartitionWriter> partitionWriter,
     ShuffleWriterOptions options,
-    std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-    arrow::MemoryPool* arrowPool) {
-  std::shared_ptr<VeloxRssSortShuffleWriter> res(new VeloxRssSortShuffleWriter(
-      numPartitions, std::move(partitionWriter), std::move(options), 
veloxPool, arrowPool));
+    MemoryManager* memoryManager) {
+  std::shared_ptr<VeloxRssSortShuffleWriter> res(
+      new VeloxRssSortShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), memoryManager));
   RETURN_NOT_OK(res->init());
   return res;
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
index 822951ff96..a9b1ae1fd1 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
@@ -55,8 +55,7 @@ class VeloxRssSortShuffleWriter final : public 
VeloxShuffleWriter {
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
       ShuffleWriterOptions options,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      arrow::MemoryPool* arrowPool);
+      MemoryManager* memoryManager);
 
   arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) 
override;
 
@@ -71,9 +70,8 @@ class VeloxRssSortShuffleWriter final : public 
VeloxShuffleWriter {
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
       ShuffleWriterOptions options,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      arrow::MemoryPool* pool)
-      : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), std::move(veloxPool), pool) {}
+      MemoryManager* memoryManager)
+      : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), memoryManager) {}
 
   arrow::Status init();
 
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxShuffleWriter.cc
index a01615dc35..66fa2be2db 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc
@@ -27,19 +27,18 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>> 
VeloxShuffleWriter::create(
     uint32_t numPartitions,
     std::unique_ptr<PartitionWriter> partitionWriter,
     ShuffleWriterOptions options,
-    std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-    arrow::MemoryPool* arrowPool) {
+    MemoryManager* memoryManager) {
   std::shared_ptr<VeloxShuffleWriter> shuffleWriter;
   switch (type) {
     case ShuffleWriterType::kHashShuffle:
       return VeloxHashShuffleWriter::create(
-          numPartitions, std::move(partitionWriter), std::move(options), 
veloxPool, arrowPool);
+          numPartitions, std::move(partitionWriter), std::move(options), 
memoryManager);
     case ShuffleWriterType::kSortShuffle:
       return VeloxSortShuffleWriter::create(
-          numPartitions, std::move(partitionWriter), std::move(options), 
veloxPool, arrowPool);
+          numPartitions, std::move(partitionWriter), std::move(options), 
memoryManager);
     case ShuffleWriterType::kRssSortShuffle:
       return VeloxRssSortShuffleWriter::create(
-          numPartitions, std::move(partitionWriter), std::move(options), 
veloxPool, arrowPool);
+          numPartitions, std::move(partitionWriter), std::move(options), 
memoryManager);
     default:
       return arrow::Status::Invalid("Unsupported shuffle writer type: ", 
typeToString(type));
   }
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h 
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index 334dc8333d..db1ab54b91 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -54,8 +54,7 @@ class VeloxShuffleWriter : public ShuffleWriter {
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
       ShuffleWriterOptions options,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      arrow::MemoryPool* arrowPool);
+      MemoryManager* memoryManager);
 
   facebook::velox::RowVectorPtr getStrippedRowVector(const 
facebook::velox::RowVector& rv) {
     // get new row type
@@ -121,11 +120,10 @@ class VeloxShuffleWriter : public ShuffleWriter {
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
       ShuffleWriterOptions options,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      arrow::MemoryPool* pool)
-      : ShuffleWriter(numPartitions, std::move(options), pool),
-        partitionBufferPool_(std::make_unique<ShuffleMemoryPool>(pool)),
-        veloxPool_(std::move(veloxPool)),
+      MemoryManager* memoryManager)
+      : ShuffleWriter(numPartitions, std::move(options)),
+        
partitionBufferPool_(memoryManager->createArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")),
+        
veloxPool_(dynamic_cast<VeloxMemoryManager*>(memoryManager)->getLeafMemoryPool()),
         partitionWriter_(std::move(partitionWriter)) {
     partitioner_ = Partitioner::make(options_.partitioning, numPartitions_, 
options_.startPartitionId);
     arenas_.resize(numPartitions);
@@ -136,7 +134,7 @@ class VeloxShuffleWriter : public ShuffleWriter {
 
   // Memory Pool used to track memory usage of partition buffers.
   // The actual allocation is delegated to options_.memoryPool.
-  std::unique_ptr<ShuffleMemoryPool> partitionBufferPool_;
+  std::shared_ptr<arrow::MemoryPool> partitionBufferPool_;
 
   std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
 
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 83dc41fdb9..7936c9c4e3 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -51,10 +51,9 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>> 
VeloxSortShuffleWriter::creat
     uint32_t numPartitions,
     std::unique_ptr<PartitionWriter> partitionWriter,
     ShuffleWriterOptions options,
-    std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-    arrow::MemoryPool* arrowPool) {
-  std::shared_ptr<VeloxSortShuffleWriter> writer(new VeloxSortShuffleWriter(
-      numPartitions, std::move(partitionWriter), std::move(options), 
std::move(veloxPool), arrowPool));
+    MemoryManager* memoryManager) {
+  std::shared_ptr<VeloxSortShuffleWriter> writer(
+      new VeloxSortShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), memoryManager));
   RETURN_NOT_OK(writer->init());
   return writer;
 }
@@ -63,9 +62,8 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter(
     uint32_t numPartitions,
     std::unique_ptr<PartitionWriter> partitionWriter,
     ShuffleWriterOptions options,
-    std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-    arrow::MemoryPool* pool)
-    : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), std::move(veloxPool), pool) {}
+    MemoryManager* memoryManager)
+    : VeloxShuffleWriter(numPartitions, std::move(partitionWriter), 
std::move(options), memoryManager) {}
 
 arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, 
int64_t memLimit) {
   ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb));
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index cb8b3ba557..0580d3d810 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -35,8 +35,7 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
       ShuffleWriterOptions options,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      arrow::MemoryPool* arrowPool);
+      MemoryManager* memoryManager);
 
   arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) 
override;
 
@@ -55,8 +54,7 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
       uint32_t numPartitions,
       std::unique_ptr<PartitionWriter> partitionWriter,
       ShuffleWriterOptions options,
-      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
-      arrow::MemoryPool* pool);
+      MemoryManager* memoryManager);
 
   arrow::Status init();
 
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 44d70667d6..f611a060f0 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -27,7 +27,10 @@ class DummyMemoryManager final : public MemoryManager {
  public:
   DummyMemoryManager(const std::string& kind) : MemoryManager(kind){};
 
-  arrow::MemoryPool* getArrowMemoryPool() override {
+  arrow::MemoryPool* defaultArrowMemoryPool() override {
+    throw GlutenException("Not yet implemented");
+  }
+  std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string& 
name) override {
     throw GlutenException("Not yet implemented");
   }
   const MemoryUsageStats collectMemoryUsageStats() const override {
diff --git a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc 
b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
index cfc0017ae7..19c340effa 100644
--- a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
+++ b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
@@ -17,10 +17,12 @@
 
 #include <gtest/gtest.h>
 
+#include "compute/VeloxBackend.h"
 #include "memory/ArrowMemoryPool.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "operators/serializer/VeloxColumnarBatchSerializer.h"
 #include "utils/VeloxArrowUtils.h"
+
 #include "velox/vector/arrow/Bridge.h"
 #include "velox/vector/tests/utils/VectorTestBase.h"
 
@@ -32,14 +34,19 @@ namespace gluten {
 
 class VeloxColumnarBatchSerializerTest : public ::testing::Test, public 
test::VectorTestBase {
  protected:
-  static void SetUpTestCase() {
+  static void SetUpTestSuite() {
+    VeloxBackend::create(AllocationListener::noop(), {});
     
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
   }
 
-  std::shared_ptr<arrow::MemoryPool> arrowPool_ = defaultArrowMemoryPool();
+  static void TearDownTestSuite() {
+    VeloxBackend::get()->tearDown();
+  }
 };
 
 TEST_F(VeloxColumnarBatchSerializerTest, serialize) {
+  auto* arrowPool = getDefaultMemoryManager()->defaultArrowMemoryPool();
+
   std::vector<VectorPtr> children = {
       makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt, 4}),
       makeNullableFlatVector<int8_t>({1, -1, std::nullopt, std::nullopt, -2}),
@@ -54,12 +61,12 @@ TEST_F(VeloxColumnarBatchSerializerTest, serialize) {
   };
   auto vector = makeRowVector(children);
   auto batch = std::make_shared<VeloxColumnarBatch>(vector);
-  auto serializer = 
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), pool_, 
nullptr);
+  auto serializer = std::make_shared<VeloxColumnarBatchSerializer>(arrowPool, 
pool_, nullptr);
   auto buffer = serializer->serializeColumnarBatches({batch});
 
   ArrowSchema cSchema;
   exportToArrow(vector, cSchema, ArrowUtils::getBridgeOptions());
-  auto deserializer = 
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), pool_, 
&cSchema);
+  auto deserializer = 
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool, pool_, &cSchema);
   auto deserialized = 
deserializer->deserialize(const_cast<uint8_t*>(buffer->data()), buffer->size());
   auto deserializedVector = 
std::dynamic_pointer_cast<VeloxColumnarBatch>(deserialized)->getRowVector();
   test::assertEqualVectors(vector, deserializedVector);
diff --git a/cpp/velox/tests/VeloxRowToColumnarTest.cc 
b/cpp/velox/tests/VeloxRowToColumnarTest.cc
index 32660c8945..2bda948f59 100644
--- a/cpp/velox/tests/VeloxRowToColumnarTest.cc
+++ b/cpp/velox/tests/VeloxRowToColumnarTest.cc
@@ -43,7 +43,7 @@ class VeloxRowToColumnarTest : public ::testing::Test, public 
test::VectorTestBa
     uint8_t* address = columnarToRowConverter->getBufferAddress();
     auto lengthVec = columnarToRowConverter->getLengths();
 
-    int64_t lengthArr[lengthVec.size()];
+    std::vector<int64_t> lengthArr(lengthVec.size());
     for (int i = 0; i < lengthVec.size(); i++) {
       lengthArr[i] = lengthVec[i];
     }
@@ -52,7 +52,7 @@ class VeloxRowToColumnarTest : public ::testing::Test, public 
test::VectorTestBa
     toArrowSchema(vector->type(), pool(), &cSchema);
     auto rowToColumnarConverter = 
std::make_shared<VeloxRowToColumnarConverter>(&cSchema, pool_);
 
-    auto cb = rowToColumnarConverter->convert(numRows, lengthArr, address);
+    auto cb = rowToColumnarConverter->convert(numRows, lengthArr.data(), 
address);
     auto vp = 
std::dynamic_pointer_cast<VeloxColumnarBatch>(cb)->getRowVector();
     velox::test::assertEqualVectors(vector, vp);
   }
diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
index 156196fc2f..d4f4704525 100644
--- a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
@@ -50,16 +50,13 @@ class VeloxHashShuffleWriterSpillTest : public 
VeloxShuffleWriterTestBase, publi
   }
 
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) override {
-    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
-    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
     auto partitionWriter = createPartitionWriter(
         PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_, 
partitionWriterOptions_);
 
     GLUTEN_ASSIGN_OR_THROW(
         auto shuffleWriter,
         VeloxHashShuffleWriter::create(
-            numPartitions, std::move(partitionWriter), shuffleWriterOptions_, 
veloxPool, arrowPool));
+            numPartitions, std::move(partitionWriter), shuffleWriterOptions_, 
getDefaultMemoryManager()));
 
     return shuffleWriter;
   }
@@ -94,7 +91,7 @@ TEST_F(VeloxHashShuffleWriterSpillTest, memoryLeak) {
 
   ASSERT_NOT_OK(shuffleWriter->stop());
 
-  const auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+  const auto* arrowPool = getDefaultMemoryManager()->defaultArrowMemoryPool();
 
   ASSERT_EQ(arrowPool->bytes_allocated(), 0);
 
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 1024dafcfb..04178f5c43 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -217,7 +217,7 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
         kDefaultBatchSize,
         kDefaultReadBufferSize,
         kDefaultDeserializerBufferSize,
-        getDefaultMemoryManager()->getArrowMemoryPool(),
+        getDefaultMemoryManager()->defaultArrowMemoryPool(),
         pool_,
         GetParam().shuffleWriterType);
 
@@ -293,9 +293,6 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
 class SinglePartitioningShuffleWriterTest : public VeloxShuffleWriterTest {
  protected:
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t) override {
-    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
-    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
     shuffleWriterOptions_.partitioning = Partitioning::kSingle;
     shuffleWriterOptions_.bufferSize = 10;
 
@@ -305,7 +302,11 @@ class SinglePartitioningShuffleWriterTest : public 
VeloxShuffleWriterTest {
     GLUTEN_ASSIGN_OR_THROW(
         auto shuffleWriter,
         VeloxShuffleWriter::create(
-            GetParam().shuffleWriterType, 1, std::move(partitionWriter), 
shuffleWriterOptions_, veloxPool, arrowPool));
+            GetParam().shuffleWriterType,
+            1,
+            std::move(partitionWriter),
+            shuffleWriterOptions_,
+            getDefaultMemoryManager()));
 
     return shuffleWriter;
   }
@@ -323,9 +324,6 @@ class HashPartitioningShuffleWriterTest : public 
VeloxShuffleWriterTest {
   }
 
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) override {
-    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
-    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
     shuffleWriterOptions_.partitioning = Partitioning::kHash;
     shuffleWriterOptions_.bufferSize = 4;
 
@@ -339,8 +337,7 @@ class HashPartitioningShuffleWriterTest : public 
VeloxShuffleWriterTest {
             numPartitions,
             std::move(partitionWriter),
             shuffleWriterOptions_,
-            veloxPool,
-            arrowPool));
+            getDefaultMemoryManager()));
 
     return shuffleWriter;
   }
@@ -368,9 +365,6 @@ class RangePartitioningShuffleWriterTest : public 
VeloxShuffleWriterTest {
   }
 
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) override {
-    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
-    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
     shuffleWriterOptions_.partitioning = Partitioning::kRange;
     shuffleWriterOptions_.bufferSize = 4;
 
@@ -384,8 +378,7 @@ class RangePartitioningShuffleWriterTest : public 
VeloxShuffleWriterTest {
             numPartitions,
             std::move(partitionWriter),
             shuffleWriterOptions_,
-            veloxPool,
-            arrowPool));
+            getDefaultMemoryManager()));
 
     return shuffleWriter;
   }
@@ -397,9 +390,6 @@ class RangePartitioningShuffleWriterTest : public 
VeloxShuffleWriterTest {
 class RoundRobinPartitioningShuffleWriterTest : public VeloxShuffleWriterTest {
  protected:
   std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t 
numPartitions) override {
-    auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
-    auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
     shuffleWriterOptions_.partitioning = Partitioning::kRoundRobin;
     shuffleWriterOptions_.bufferSize = 4096;
 
@@ -413,8 +403,7 @@ class RoundRobinPartitioningShuffleWriterTest : public 
VeloxShuffleWriterTest {
             numPartitions,
             std::move(partitionWriter),
             shuffleWriterOptions_,
-            veloxPool,
-            arrowPool));
+            getDefaultMemoryManager()));
 
     return shuffleWriter;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to