This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b8087c85da [GLUTEN-9836][VL] Remove ShuffleMemoryPool and support
creating arrow pool instances (#9869)
b8087c85da is described below
commit b8087c85da7cabe57a25a09aea66c83dc83e89ed
Author: Rong Ma <[email protected]>
AuthorDate: Sun Jun 8 15:25:19 2025 +0100
[GLUTEN-9836][VL] Remove ShuffleMemoryPool and support creating arrow pool
instances (#9869)
---
cpp/core/CMakeLists.txt | 1 -
cpp/core/jni/JniWrapper.cc | 6 +-
cpp/core/memory/ArrowMemoryPool.cc | 17 +++--
cpp/core/memory/ArrowMemoryPool.h | 17 +++--
cpp/core/memory/MemoryManager.h | 8 ++-
cpp/core/shuffle/PartitionWriter.h | 5 +-
cpp/core/shuffle/ShuffleMemoryPool.cc | 72 ----------------------
cpp/core/shuffle/ShuffleMemoryPool.h | 48 ---------------
cpp/core/shuffle/ShuffleWriter.cc | 8 +--
cpp/core/shuffle/ShuffleWriter.h | 6 +-
cpp/velox/compute/VeloxRuntime.cc | 9 +--
cpp/velox/memory/VeloxMemoryManager.cc | 62 ++++++++++++++++---
cpp/velox/memory/VeloxMemoryManager.h | 19 ++++--
cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 5 +-
cpp/velox/shuffle/VeloxHashShuffleWriter.h | 8 +--
cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc | 7 +--
cpp/velox/shuffle/VeloxRssSortShuffleWriter.h | 8 +--
cpp/velox/shuffle/VeloxShuffleWriter.cc | 9 ++-
cpp/velox/shuffle/VeloxShuffleWriter.h | 14 ++---
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 12 ++--
cpp/velox/shuffle/VeloxSortShuffleWriter.h | 6 +-
cpp/velox/tests/RuntimeTest.cc | 5 +-
.../tests/VeloxColumnarBatchSerializerTest.cc | 15 +++--
cpp/velox/tests/VeloxRowToColumnarTest.cc | 4 +-
cpp/velox/tests/VeloxShuffleWriterSpillTest.cc | 7 +--
cpp/velox/tests/VeloxShuffleWriterTest.cc | 29 +++------
26 files changed, 168 insertions(+), 239 deletions(-)
diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt
index 9caacf277c..b164f2a953 100644
--- a/cpp/core/CMakeLists.txt
+++ b/cpp/core/CMakeLists.txt
@@ -137,7 +137,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
shuffle/rss/RssPartitionWriter.cc
shuffle/RandomPartitioner.cc
shuffle/RoundRobinPartitioner.cc
- shuffle/ShuffleMemoryPool.cc
shuffle/ShuffleWriter.cc
shuffle/SinglePartitioner.cc
shuffle/Spill.cc
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 1321b6041f..2fde8ca4cd 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -144,10 +144,14 @@ class InternalMemoryManager : public MemoryManager {
public:
InternalMemoryManager(const std::string& kind) : MemoryManager(kind) {}
- arrow::MemoryPool* getArrowMemoryPool() override {
+ arrow::MemoryPool* defaultArrowMemoryPool() override {
throw GlutenException("Not implemented");
}
+ std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string&
name) override {
+ throw GlutenException("Not yet implemented");
+ }
+
const MemoryUsageStats collectMemoryUsageStats() const override {
return MemoryUsageStats();
}
diff --git a/cpp/core/memory/ArrowMemoryPool.cc
b/cpp/core/memory/ArrowMemoryPool.cc
index e1ecae142b..83a511cf8f 100644
--- a/cpp/core/memory/ArrowMemoryPool.cc
+++ b/cpp/core/memory/ArrowMemoryPool.cc
@@ -16,14 +16,14 @@
*/
#include "ArrowMemoryPool.h"
-#include "arrow/type_fwd.h"
#include "utils/Exception.h"
namespace gluten {
-std::shared_ptr<arrow::MemoryPool> defaultArrowMemoryPool() {
- static auto staticPool =
std::make_shared<ArrowMemoryPool>(defaultMemoryAllocator().get());
- return staticPool;
+ArrowMemoryPool::~ArrowMemoryPool() {
+ if (releaser_ != nullptr) {
+ releaser_(this);
+ }
}
arrow::Status ArrowMemoryPool::Allocate(int64_t size, int64_t alignment,
uint8_t** out) {
@@ -45,10 +45,13 @@ void ArrowMemoryPool::Free(uint8_t* buffer, int64_t size,
int64_t alignment) {
}
int64_t ArrowMemoryPool::bytes_allocated() const {
- // fixme use self accountant
return allocator_->getBytes();
}
+int64_t ArrowMemoryPool::max_memory() const {
+ return allocator_->peakBytes();
+}
+
int64_t ArrowMemoryPool::total_bytes_allocated() const {
throw GlutenException("Not implement");
}
@@ -61,4 +64,8 @@ std::string ArrowMemoryPool::backend_name() const {
return "gluten arrow allocator";
}
+MemoryAllocator* ArrowMemoryPool::allocator() const {
+ return allocator_.get();
+}
+
} // namespace gluten
diff --git a/cpp/core/memory/ArrowMemoryPool.h
b/cpp/core/memory/ArrowMemoryPool.h
index 85152d119c..9f3592ceec 100644
--- a/cpp/core/memory/ArrowMemoryPool.h
+++ b/cpp/core/memory/ArrowMemoryPool.h
@@ -23,12 +23,16 @@
namespace gluten {
-/// This pool was not tracked by Spark, should only used in test.
-std::shared_ptr<arrow::MemoryPool> defaultArrowMemoryPool();
+using ArrowMemoryPoolReleaser = std::function<void(arrow::MemoryPool*)>;
+/// This pool was not tracked by Spark, should only used in test.
class ArrowMemoryPool final : public arrow::MemoryPool {
public:
- explicit ArrowMemoryPool(MemoryAllocator* allocator) : allocator_(allocator)
{}
+ explicit ArrowMemoryPool(AllocationListener* listener,
ArrowMemoryPoolReleaser releaser = nullptr)
+ :
allocator_(std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
listener)),
+ releaser_(std::move(releaser)) {}
+
+ ~ArrowMemoryPool() override;
arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out)
override;
@@ -38,14 +42,19 @@ class ArrowMemoryPool final : public arrow::MemoryPool {
int64_t bytes_allocated() const override;
+ int64_t max_memory() const override;
+
int64_t total_bytes_allocated() const override;
int64_t num_allocations() const override;
std::string backend_name() const override;
+ MemoryAllocator* allocator() const;
+
private:
- MemoryAllocator* allocator_;
+ std::unique_ptr<MemoryAllocator> allocator_;
+ ArrowMemoryPoolReleaser releaser_;
};
} // namespace gluten
diff --git a/cpp/core/memory/MemoryManager.h b/cpp/core/memory/MemoryManager.h
index 5473eb74e9..cb7512d23c 100644
--- a/cpp/core/memory/MemoryManager.h
+++ b/cpp/core/memory/MemoryManager.h
@@ -39,7 +39,13 @@ class MemoryManager {
return kind_;
}
- virtual arrow::MemoryPool* getArrowMemoryPool() = 0;
+ // Get the default Arrow memory pool for this memory manager. This memory
pool is held by the memory manager.
+ virtual arrow::MemoryPool* defaultArrowMemoryPool() = 0;
+
+ // Create a new Arrow memory pool with the given name. The caller is
responsible for managing the lifetime of the
+ // returned memory pool. Memory manager only holds the weak reference to the
memory pool for collecting memory usage.
+ // If the name is already used by an existing memory pool, the creation will
fail.
+ virtual std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const
std::string& name) = 0;
virtual const MemoryUsageStats collectMemoryUsageStats() const = 0;
diff --git a/cpp/core/shuffle/PartitionWriter.h
b/cpp/core/shuffle/PartitionWriter.h
index ece16c95a9..73e05ed287 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -17,7 +17,6 @@
#pragma once
-#include "ShuffleMemoryPool.h"
#include "memory/MemoryManager.h"
#include "memory/Reclaimable.h"
#include "shuffle/Options.h"
@@ -34,7 +33,7 @@ class PartitionWriter : public Reclaimable {
public:
PartitionWriter(uint32_t numPartitions, PartitionWriterOptions options,
MemoryManager* memoryManager)
: numPartitions_(numPartitions), options_(std::move(options)),
memoryManager_(memoryManager) {
- payloadPool_ =
std::make_unique<ShuffleMemoryPool>(memoryManager->getArrowMemoryPool());
+ payloadPool_ =
memoryManager->createArrowMemoryPool("PartitionWriter.cached_payload");
codec_ = createArrowIpcCodec(options_.compressionType,
options_.codecBackend, options_.compressionLevel);
}
@@ -79,7 +78,7 @@ class PartitionWriter : public Reclaimable {
// Memory Pool used to track memory allocation of partition payloads.
// The actual allocation is delegated to options_.memoryPool.
- std::unique_ptr<ShuffleMemoryPool> payloadPool_;
+ std::shared_ptr<arrow::MemoryPool> payloadPool_;
std::unique_ptr<arrow::util::Codec> codec_;
diff --git a/cpp/core/shuffle/ShuffleMemoryPool.cc
b/cpp/core/shuffle/ShuffleMemoryPool.cc
deleted file mode 100644
index a1fa26bce9..0000000000
--- a/cpp/core/shuffle/ShuffleMemoryPool.cc
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "shuffle/ShuffleMemoryPool.h"
-
-namespace gluten {
-gluten::ShuffleMemoryPool::ShuffleMemoryPool(arrow::MemoryPool* pool) :
pool_(pool) {}
-
-arrow::Status ShuffleMemoryPool::Allocate(int64_t size, int64_t alignment,
uint8_t** out) {
- auto before = pool_->bytes_allocated();
- auto status = pool_->Allocate(size, alignment, out);
- if (status.ok()) {
- bytesAllocated_ += (pool_->bytes_allocated() - before);
- if (bytesAllocated_ > peakBytesAllocated_) {
- peakBytesAllocated_ = bytesAllocated_;
- }
- }
- return status;
-}
-
-arrow::Status ShuffleMemoryPool::Reallocate(int64_t old_size, int64_t
new_size, int64_t alignment, uint8_t** ptr) {
- auto before = pool_->bytes_allocated();
- auto status = pool_->Reallocate(old_size, new_size, alignment, ptr);
- if (status.ok()) {
- bytesAllocated_ += (pool_->bytes_allocated() - before);
- if (bytesAllocated_ > peakBytesAllocated_) {
- peakBytesAllocated_ = bytesAllocated_;
- }
- }
- return status;
-}
-
-void ShuffleMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment)
{
- auto before = pool_->bytes_allocated();
- pool_->Free(buffer, size, alignment);
- bytesAllocated_ += (pool_->bytes_allocated() - before);
-}
-
-int64_t ShuffleMemoryPool::bytes_allocated() const {
- return bytesAllocated_;
-}
-
-int64_t ShuffleMemoryPool::max_memory() const {
- return peakBytesAllocated_;
-}
-
-std::string ShuffleMemoryPool::backend_name() const {
- return pool_->backend_name();
-}
-
-int64_t ShuffleMemoryPool::total_bytes_allocated() const {
- return pool_->total_bytes_allocated();
-}
-
-int64_t ShuffleMemoryPool::num_allocations() const {
- throw pool_->num_allocations();
-}
-} // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleMemoryPool.h
b/cpp/core/shuffle/ShuffleMemoryPool.h
deleted file mode 100644
index e7101295ce..0000000000
--- a/cpp/core/shuffle/ShuffleMemoryPool.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <arrow/memory_pool.h>
-
-#pragma once
-
-namespace gluten {
-class ShuffleMemoryPool : public arrow::MemoryPool {
- public:
- ShuffleMemoryPool(arrow::MemoryPool* pool);
-
- arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out)
override;
-
- arrow::Status Reallocate(int64_t old_size, int64_t new_size, int64_t
alignment, uint8_t** ptr) override;
-
- void Free(uint8_t* buffer, int64_t size, int64_t alignment) override;
-
- int64_t bytes_allocated() const override;
-
- int64_t max_memory() const override;
-
- std::string backend_name() const override;
-
- int64_t total_bytes_allocated() const override;
-
- int64_t num_allocations() const override;
-
- private:
- arrow::MemoryPool* pool_;
- uint64_t bytesAllocated_ = 0;
- uint64_t peakBytesAllocated_ = 0;
-};
-} // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleWriter.cc
b/cpp/core/shuffle/ShuffleWriter.cc
index 13d7c30fec..192de798d8 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -83,10 +83,6 @@ int64_t ShuffleWriter::totalCompressTime() const {
return metrics_.totalCompressTime;
}
-int64_t ShuffleWriter::peakBytesAllocated() const {
- return pool_->max_memory();
-}
-
int64_t ShuffleWriter::totalSortTime() const {
return 0;
}
@@ -103,6 +99,6 @@ const std::vector<int64_t>&
ShuffleWriter::rawPartitionLengths() const {
return metrics_.rawPartitionLengths;
}
-ShuffleWriter::ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions
options, arrow::MemoryPool* pool)
- : numPartitions_(numPartitions), options_(std::move(options)), pool_(pool)
{}
+ShuffleWriter::ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions
options)
+ : numPartitions_(numPartitions), options_(std::move(options)) {}
} // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 8852f0527b..fea4708f97 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -55,7 +55,7 @@ class ShuffleWriter : public Reclaimable {
int64_t totalCompressTime() const;
- virtual int64_t peakBytesAllocated() const;
+ virtual int64_t peakBytesAllocated() const = 0;
virtual int64_t totalSortTime() const;
@@ -66,7 +66,7 @@ class ShuffleWriter : public Reclaimable {
const std::vector<int64_t>& rawPartitionLengths() const;
protected:
- ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions options,
arrow::MemoryPool* pool);
+ ShuffleWriter(int32_t numPartitions, ShuffleWriterOptions options);
~ShuffleWriter() override = default;
@@ -74,8 +74,6 @@ class ShuffleWriter : public Reclaimable {
ShuffleWriterOptions options_;
- arrow::MemoryPool* pool_;
-
ShuffleWriterMetrics metrics_{};
};
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 5ab32d5aa9..f9f3e127e5 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -217,8 +217,6 @@ std::shared_ptr<ShuffleWriter>
VeloxRuntime::createShuffleWriter(
int numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options) {
- auto veloxPool = memoryManager()->getLeafMemoryPool();
- auto arrowPool = memoryManager()->getArrowMemoryPool();
GLUTEN_ASSIGN_OR_THROW(
std::shared_ptr<ShuffleWriter> shuffleWriter,
VeloxShuffleWriter::create(
@@ -226,8 +224,7 @@ std::shared_ptr<ShuffleWriter>
VeloxRuntime::createShuffleWriter(
numPartitions,
std::move(partitionWriter),
std::move(options),
- veloxPool,
- arrowPool));
+ memoryManager()));
return shuffleWriter;
}
@@ -287,7 +284,7 @@ std::shared_ptr<ShuffleReader>
VeloxRuntime::createShuffleReader(
options.batchSize,
options.readerBufferSize,
options.deserializerBufferSize,
- memoryManager()->getArrowMemoryPool(),
+ memoryManager()->defaultArrowMemoryPool(),
memoryManager()->getLeafMemoryPool(),
options.shuffleWriterType);
@@ -295,7 +292,7 @@ std::shared_ptr<ShuffleReader>
VeloxRuntime::createShuffleReader(
}
std::unique_ptr<ColumnarBatchSerializer>
VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
- auto arrowPool = memoryManager()->getArrowMemoryPool();
+ auto arrowPool = memoryManager()->defaultArrowMemoryPool();
auto veloxPool = memoryManager()->getLeafMemoryPool();
return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool,
cSchema);
}
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 563bb881d3..4dc09d415e 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -225,8 +225,9 @@ VeloxMemoryManager::VeloxMemoryManager(
auto reservationBlockSize =
backendConf.get<uint64_t>(kMemoryReservationBlockSize,
kMemoryReservationBlockSizeDefault);
blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(),
reservationBlockSize);
- listenableAlloc_ =
std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
blockListener_.get());
- arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
+ defaultArrowPool_ = std::make_shared<ArrowMemoryPool>(blockListener_.get());
+ arrowPools_.emplace("default", defaultArrowPool_);
+
auto checkUsageLeak = backendConf.get<bool>(kCheckUsageLeak,
kCheckUsageLeakDefault);
ArbitratorFactoryRegister afr(listener_.get());
@@ -261,10 +262,32 @@ MemoryUsageStats collectVeloxMemoryUsageStats(const
velox::memory::MemoryPool* p
return stats;
}
-MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(const MemoryAllocator*
allocator) {
+MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(
+ const std::unordered_map<std::string, std::weak_ptr<ArrowMemoryPool>>&
arrowPools) {
MemoryUsageStats stats;
- stats.set_current(allocator->getBytes());
- stats.set_peak(allocator->peakBytes());
+ int64_t totalBytes = 0;
+ int64_t peakBytes = 0;
+
+ for (const auto& [name, ptr] : arrowPools) {
+ auto pool = ptr.lock();
+ if (pool == nullptr) {
+ continue;
+ }
+
+ MemoryUsageStats poolStats;
+ const auto allocated = pool->bytes_allocated();
+ const auto peak = pool->max_memory();
+ poolStats.set_current(allocated);
+ poolStats.set_peak(peak);
+
+ stats.mutable_children()->emplace(name, poolStats);
+
+ totalBytes += allocated;
+ peakBytes = std::max(peakBytes, peak);
+ }
+
+ stats.set_current(totalBytes);
+ stats.set_peak(peakBytes);
return stats;
}
@@ -281,12 +304,26 @@ int64_t
shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::M
}
} // namespace
+std::shared_ptr<arrow::MemoryPool>
VeloxMemoryManager::createArrowMemoryPool(const std::string& name) {
+ std::lock_guard<std::mutex> l(mutex_);
+ VELOX_CHECK_EQ(arrowPools_.count(name), 0, "Arrow memory pool {} already
exists", name);
+ auto pool = std::make_shared<ArrowMemoryPool>(
+ blockListener_.get(), [this, name](arrow::MemoryPool* pool) {
this->dropMemoryPool(name); });
+ arrowPools_.emplace(name, pool);
+ return pool;
+}
+
+void VeloxMemoryManager::dropMemoryPool(const std::string& name) {
+ std::lock_guard<std::mutex> l(mutex_);
+ const auto ret = arrowPools_.erase(name);
+ VELOX_CHECK_EQ(ret, 1, "Child memory pool {} doesn't exist", name);
+}
+
const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const {
MemoryUsageStats stats;
stats.set_current(listener_->currentBytes());
stats.set_peak(listener_->peakBytes());
- stats.mutable_children()->emplace(
- "gluten::MemoryAllocator",
collectGlutenAllocatorMemoryUsageStats(listenableAlloc_.get()));
+ stats.mutable_children()->emplace("gluten::MemoryAllocator",
collectGlutenAllocatorMemoryUsageStats(arrowPools_));
stats.mutable_children()->emplace(
veloxAggregatePool_->name(),
collectVeloxMemoryUsageStats(veloxAggregatePool_.get()));
return stats;
@@ -367,10 +404,17 @@ bool VeloxMemoryManager::tryDestructSafe() {
veloxMemoryManager_.reset();
// Applies similar rule for Arrow memory pool.
- if (arrowPool_ && arrowPool_->bytes_allocated() != 0) {
+ if (!arrowPools_.empty() && std::any_of(arrowPools_.begin(),
arrowPools_.end(), [&](const auto& entry) {
+ auto pool = entry.second.lock();
+ if (pool == nullptr) {
+ return false;
+ }
+ return pool->bytes_allocated() != 0;
+ })) {
+ VLOG(2) << "Attempt to destruct VeloxMemoryManager failed because there
are still outstanding Arrow memory pools.";
return false;
}
- arrowPool_.reset();
+ arrowPools_.clear();
// Successfully destructed.
return true;
diff --git a/cpp/velox/memory/VeloxMemoryManager.h
b/cpp/velox/memory/VeloxMemoryManager.h
index c2d79a9131..28c0f79614 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -18,6 +18,7 @@
#pragma once
#include "memory/AllocationListener.h"
+#include "memory/ArrowMemoryPool.h"
#include "memory/MemoryAllocator.h"
#include "memory/MemoryManager.h"
#include "velox/common/memory/Memory.h"
@@ -78,10 +79,12 @@ class VeloxMemoryManager final : public MemoryManager {
return veloxMemoryManager_.get();
}
- arrow::MemoryPool* getArrowMemoryPool() override {
- return arrowPool_.get();
+ arrow::MemoryPool* defaultArrowMemoryPool() override {
+ return defaultArrowPool_.get();
}
+ std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string&
name) override;
+
const MemoryUsageStats collectMemoryUsageStats() const override;
const int64_t shrink(int64_t size) override;
@@ -90,7 +93,7 @@ class VeloxMemoryManager final : public MemoryManager {
/// Test only
MemoryAllocator* allocator() const {
- return listenableAlloc_.get();
+ return defaultArrowPool_->allocator();
}
AllocationListener* getListener() const {
@@ -100,20 +103,24 @@ class VeloxMemoryManager final : public MemoryManager {
private:
bool tryDestructSafe();
+ void dropMemoryPool(const std::string& name);
+
#ifdef GLUTEN_ENABLE_HBM
std::unique_ptr<VeloxMemoryAllocator> wrappedAlloc_;
#endif
- // This is a listenable allocator used for arrow.
- std::unique_ptr<MemoryAllocator> listenableAlloc_;
std::unique_ptr<AllocationListener> listener_;
std::unique_ptr<AllocationListener> blockListener_;
- std::unique_ptr<arrow::MemoryPool> arrowPool_;
+
+ std::shared_ptr<ArrowMemoryPool> defaultArrowPool_;
+ std::unordered_map<std::string, std::weak_ptr<ArrowMemoryPool>> arrowPools_;
std::unique_ptr<facebook::velox::memory::MemoryManager> veloxMemoryManager_;
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxAggregatePool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxLeafPool_;
std::vector<std::shared_ptr<facebook::velox::memory::MemoryPool>>
heldVeloxPools_;
+
+ std::mutex mutex_;
};
VeloxMemoryManager* getDefaultMemoryManager();
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index cdc0a7653c..2098332484 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -157,10 +157,9 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>>
VeloxHashShuffleWriter::creat
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* arrowPool) {
+ MemoryManager* memoryManager) {
std::shared_ptr<VeloxHashShuffleWriter> res(
- new VeloxHashShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), veloxPool, arrowPool));
+ new VeloxHashShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), memoryManager));
RETURN_NOT_OK(res->init());
return res;
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
index 5f45c9065c..e8f37d89f6 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
@@ -126,8 +126,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* arrowPool);
+ MemoryManager* memoryManager);
arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit)
override;
@@ -194,9 +193,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* pool)
- : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), std::move(veloxPool), pool) {}
+ MemoryManager* memoryManager)
+ : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), memoryManager) {}
arrow::Status init();
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index 66d02aa68e..bb495ddf35 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -33,10 +33,9 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>>
VeloxRssSortShuffleWriter::cr
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* arrowPool) {
- std::shared_ptr<VeloxRssSortShuffleWriter> res(new VeloxRssSortShuffleWriter(
- numPartitions, std::move(partitionWriter), std::move(options),
veloxPool, arrowPool));
+ MemoryManager* memoryManager) {
+ std::shared_ptr<VeloxRssSortShuffleWriter> res(
+ new VeloxRssSortShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), memoryManager));
RETURN_NOT_OK(res->init());
return res;
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
index 822951ff96..a9b1ae1fd1 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.h
@@ -55,8 +55,7 @@ class VeloxRssSortShuffleWriter final : public
VeloxShuffleWriter {
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* arrowPool);
+ MemoryManager* memoryManager);
arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit)
override;
@@ -71,9 +70,8 @@ class VeloxRssSortShuffleWriter final : public
VeloxShuffleWriter {
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* pool)
- : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), std::move(veloxPool), pool) {}
+ MemoryManager* memoryManager)
+ : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), memoryManager) {}
arrow::Status init();
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc
b/cpp/velox/shuffle/VeloxShuffleWriter.cc
index a01615dc35..66fa2be2db 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc
@@ -27,19 +27,18 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>>
VeloxShuffleWriter::create(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* arrowPool) {
+ MemoryManager* memoryManager) {
std::shared_ptr<VeloxShuffleWriter> shuffleWriter;
switch (type) {
case ShuffleWriterType::kHashShuffle:
return VeloxHashShuffleWriter::create(
- numPartitions, std::move(partitionWriter), std::move(options),
veloxPool, arrowPool);
+ numPartitions, std::move(partitionWriter), std::move(options),
memoryManager);
case ShuffleWriterType::kSortShuffle:
return VeloxSortShuffleWriter::create(
- numPartitions, std::move(partitionWriter), std::move(options),
veloxPool, arrowPool);
+ numPartitions, std::move(partitionWriter), std::move(options),
memoryManager);
case ShuffleWriterType::kRssSortShuffle:
return VeloxRssSortShuffleWriter::create(
- numPartitions, std::move(partitionWriter), std::move(options),
veloxPool, arrowPool);
+ numPartitions, std::move(partitionWriter), std::move(options),
memoryManager);
default:
return arrow::Status::Invalid("Unsupported shuffle writer type: ",
typeToString(type));
}
diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h
b/cpp/velox/shuffle/VeloxShuffleWriter.h
index 334dc8333d..db1ab54b91 100644
--- a/cpp/velox/shuffle/VeloxShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxShuffleWriter.h
@@ -54,8 +54,7 @@ class VeloxShuffleWriter : public ShuffleWriter {
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* arrowPool);
+ MemoryManager* memoryManager);
facebook::velox::RowVectorPtr getStrippedRowVector(const
facebook::velox::RowVector& rv) {
// get new row type
@@ -121,11 +120,10 @@ class VeloxShuffleWriter : public ShuffleWriter {
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* pool)
- : ShuffleWriter(numPartitions, std::move(options), pool),
- partitionBufferPool_(std::make_unique<ShuffleMemoryPool>(pool)),
- veloxPool_(std::move(veloxPool)),
+ MemoryManager* memoryManager)
+ : ShuffleWriter(numPartitions, std::move(options)),
+
partitionBufferPool_(memoryManager->createArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")),
+
veloxPool_(dynamic_cast<VeloxMemoryManager*>(memoryManager)->getLeafMemoryPool()),
partitionWriter_(std::move(partitionWriter)) {
partitioner_ = Partitioner::make(options_.partitioning, numPartitions_,
options_.startPartitionId);
arenas_.resize(numPartitions);
@@ -136,7 +134,7 @@ class VeloxShuffleWriter : public ShuffleWriter {
// Memory Pool used to track memory usage of partition buffers.
// The actual allocation is delegated to options_.memoryPool.
- std::unique_ptr<ShuffleMemoryPool> partitionBufferPool_;
+ std::shared_ptr<arrow::MemoryPool> partitionBufferPool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 83dc41fdb9..7936c9c4e3 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -51,10 +51,9 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>>
VeloxSortShuffleWriter::creat
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* arrowPool) {
- std::shared_ptr<VeloxSortShuffleWriter> writer(new VeloxSortShuffleWriter(
- numPartitions, std::move(partitionWriter), std::move(options),
std::move(veloxPool), arrowPool));
+ MemoryManager* memoryManager) {
+ std::shared_ptr<VeloxSortShuffleWriter> writer(
+ new VeloxSortShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), memoryManager));
RETURN_NOT_OK(writer->init());
return writer;
}
@@ -63,9 +62,8 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter(
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* pool)
- : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), std::move(veloxPool), pool) {}
+ MemoryManager* memoryManager)
+ : VeloxShuffleWriter(numPartitions, std::move(partitionWriter),
std::move(options), memoryManager) {}
arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb,
int64_t memLimit) {
ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb));
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index cb8b3ba557..0580d3d810 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -35,8 +35,7 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* arrowPool);
+ MemoryManager* memoryManager);
arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit)
override;
@@ -55,8 +54,7 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
uint32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- arrow::MemoryPool* pool);
+ MemoryManager* memoryManager);
arrow::Status init();
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 44d70667d6..f611a060f0 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -27,7 +27,10 @@ class DummyMemoryManager final : public MemoryManager {
public:
DummyMemoryManager(const std::string& kind) : MemoryManager(kind){};
- arrow::MemoryPool* getArrowMemoryPool() override {
+ arrow::MemoryPool* defaultArrowMemoryPool() override {
+ throw GlutenException("Not yet implemented");
+ }
+ std::shared_ptr<arrow::MemoryPool> createArrowMemoryPool(const std::string&
name) override {
throw GlutenException("Not yet implemented");
}
const MemoryUsageStats collectMemoryUsageStats() const override {
diff --git a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
index cfc0017ae7..19c340effa 100644
--- a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
+++ b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
@@ -17,10 +17,12 @@
#include <gtest/gtest.h>
+#include "compute/VeloxBackend.h"
#include "memory/ArrowMemoryPool.h"
#include "memory/VeloxColumnarBatch.h"
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
#include "utils/VeloxArrowUtils.h"
+
#include "velox/vector/arrow/Bridge.h"
#include "velox/vector/tests/utils/VectorTestBase.h"
@@ -32,14 +34,19 @@ namespace gluten {
class VeloxColumnarBatchSerializerTest : public ::testing::Test, public
test::VectorTestBase {
protected:
- static void SetUpTestCase() {
+ static void SetUpTestSuite() {
+ VeloxBackend::create(AllocationListener::noop(), {});
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
}
- std::shared_ptr<arrow::MemoryPool> arrowPool_ = defaultArrowMemoryPool();
+ static void TearDownTestSuite() {
+ VeloxBackend::get()->tearDown();
+ }
};
TEST_F(VeloxColumnarBatchSerializerTest, serialize) {
+ auto* arrowPool = getDefaultMemoryManager()->defaultArrowMemoryPool();
+
std::vector<VectorPtr> children = {
makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt, 4}),
makeNullableFlatVector<int8_t>({1, -1, std::nullopt, std::nullopt, -2}),
@@ -54,12 +61,12 @@ TEST_F(VeloxColumnarBatchSerializerTest, serialize) {
};
auto vector = makeRowVector(children);
auto batch = std::make_shared<VeloxColumnarBatch>(vector);
- auto serializer =
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), pool_,
nullptr);
+ auto serializer = std::make_shared<VeloxColumnarBatchSerializer>(arrowPool,
pool_, nullptr);
auto buffer = serializer->serializeColumnarBatches({batch});
ArrowSchema cSchema;
exportToArrow(vector, cSchema, ArrowUtils::getBridgeOptions());
- auto deserializer =
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), pool_,
&cSchema);
+ auto deserializer =
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool, pool_, &cSchema);
auto deserialized =
deserializer->deserialize(const_cast<uint8_t*>(buffer->data()), buffer->size());
auto deserializedVector =
std::dynamic_pointer_cast<VeloxColumnarBatch>(deserialized)->getRowVector();
test::assertEqualVectors(vector, deserializedVector);
diff --git a/cpp/velox/tests/VeloxRowToColumnarTest.cc
b/cpp/velox/tests/VeloxRowToColumnarTest.cc
index 32660c8945..2bda948f59 100644
--- a/cpp/velox/tests/VeloxRowToColumnarTest.cc
+++ b/cpp/velox/tests/VeloxRowToColumnarTest.cc
@@ -43,7 +43,7 @@ class VeloxRowToColumnarTest : public ::testing::Test, public
test::VectorTestBa
uint8_t* address = columnarToRowConverter->getBufferAddress();
auto lengthVec = columnarToRowConverter->getLengths();
- int64_t lengthArr[lengthVec.size()];
+ std::vector<int64_t> lengthArr(lengthVec.size());
for (int i = 0; i < lengthVec.size(); i++) {
lengthArr[i] = lengthVec[i];
}
@@ -52,7 +52,7 @@ class VeloxRowToColumnarTest : public ::testing::Test, public
test::VectorTestBa
toArrowSchema(vector->type(), pool(), &cSchema);
auto rowToColumnarConverter =
std::make_shared<VeloxRowToColumnarConverter>(&cSchema, pool_);
- auto cb = rowToColumnarConverter->convert(numRows, lengthArr, address);
+ auto cb = rowToColumnarConverter->convert(numRows, lengthArr.data(),
address);
auto vp =
std::dynamic_pointer_cast<VeloxColumnarBatch>(cb)->getRowVector();
velox::test::assertEqualVectors(vector, vp);
}
diff --git a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
index 156196fc2f..d4f4704525 100644
--- a/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterSpillTest.cc
@@ -50,16 +50,13 @@ class VeloxHashShuffleWriterSpillTest : public
VeloxShuffleWriterTestBase, publi
}
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) override {
- auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
- auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
auto partitionWriter = createPartitionWriter(
PartitionWriterType::kLocal, numPartitions, dataFile_, localDirs_,
partitionWriterOptions_);
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
VeloxHashShuffleWriter::create(
- numPartitions, std::move(partitionWriter), shuffleWriterOptions_,
veloxPool, arrowPool));
+ numPartitions, std::move(partitionWriter), shuffleWriterOptions_,
getDefaultMemoryManager()));
return shuffleWriter;
}
@@ -94,7 +91,7 @@ TEST_F(VeloxHashShuffleWriterSpillTest, memoryLeak) {
ASSERT_NOT_OK(shuffleWriter->stop());
- const auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
+ const auto* arrowPool = getDefaultMemoryManager()->defaultArrowMemoryPool();
ASSERT_EQ(arrowPool->bytes_allocated(), 0);
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 1024dafcfb..04178f5c43 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -217,7 +217,7 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
kDefaultBatchSize,
kDefaultReadBufferSize,
kDefaultDeserializerBufferSize,
- getDefaultMemoryManager()->getArrowMemoryPool(),
+ getDefaultMemoryManager()->defaultArrowMemoryPool(),
pool_,
GetParam().shuffleWriterType);
@@ -293,9 +293,6 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
class SinglePartitioningShuffleWriterTest : public VeloxShuffleWriterTest {
protected:
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t) override {
- auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
- auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
shuffleWriterOptions_.partitioning = Partitioning::kSingle;
shuffleWriterOptions_.bufferSize = 10;
@@ -305,7 +302,11 @@ class SinglePartitioningShuffleWriterTest : public
VeloxShuffleWriterTest {
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
VeloxShuffleWriter::create(
- GetParam().shuffleWriterType, 1, std::move(partitionWriter),
shuffleWriterOptions_, veloxPool, arrowPool));
+ GetParam().shuffleWriterType,
+ 1,
+ std::move(partitionWriter),
+ shuffleWriterOptions_,
+ getDefaultMemoryManager()));
return shuffleWriter;
}
@@ -323,9 +324,6 @@ class HashPartitioningShuffleWriterTest : public
VeloxShuffleWriterTest {
}
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) override {
- auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
- auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
shuffleWriterOptions_.partitioning = Partitioning::kHash;
shuffleWriterOptions_.bufferSize = 4;
@@ -339,8 +337,7 @@ class HashPartitioningShuffleWriterTest : public
VeloxShuffleWriterTest {
numPartitions,
std::move(partitionWriter),
shuffleWriterOptions_,
- veloxPool,
- arrowPool));
+ getDefaultMemoryManager()));
return shuffleWriter;
}
@@ -368,9 +365,6 @@ class RangePartitioningShuffleWriterTest : public
VeloxShuffleWriterTest {
}
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) override {
- auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
- auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
shuffleWriterOptions_.partitioning = Partitioning::kRange;
shuffleWriterOptions_.bufferSize = 4;
@@ -384,8 +378,7 @@ class RangePartitioningShuffleWriterTest : public
VeloxShuffleWriterTest {
numPartitions,
std::move(partitionWriter),
shuffleWriterOptions_,
- veloxPool,
- arrowPool));
+ getDefaultMemoryManager()));
return shuffleWriter;
}
@@ -397,9 +390,6 @@ class RangePartitioningShuffleWriterTest : public
VeloxShuffleWriterTest {
class RoundRobinPartitioningShuffleWriterTest : public VeloxShuffleWriterTest {
protected:
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t
numPartitions) override {
- auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
- auto veloxPool = getDefaultMemoryManager()->getLeafMemoryPool();
-
shuffleWriterOptions_.partitioning = Partitioning::kRoundRobin;
shuffleWriterOptions_.bufferSize = 4096;
@@ -413,8 +403,7 @@ class RoundRobinPartitioningShuffleWriterTest : public
VeloxShuffleWriterTest {
numPartitions,
std::move(partitionWriter),
shuffleWriterOptions_,
- veloxPool,
- arrowPool));
+ getDefaultMemoryManager()));
return shuffleWriter;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]