This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4a951ba78 [VL] Move memory reservation block computation logic into
AllocationListener
4a951ba78 is described below
commit 4a951ba78f07926cb1a85429fa43584eba38be44
Author: Yang Zhang <[email protected]>
AuthorDate: Mon May 20 19:23:38 2024 +0800
[VL] Move memory reservation block computation logic into AllocationListener
---
cpp/core/config/GlutenConfig.h | 3 +
cpp/core/jni/JniCommon.h | 59 +++-------
cpp/core/jni/JniWrapper.cc | 5 +-
cpp/core/memory/AllocationListener.h | 43 +++++++
cpp/core/memory/MemoryAllocator.cc | 55 +++------
cpp/core/memory/MemoryAllocator.h | 5 +-
cpp/velox/benchmarks/common/BenchmarkUtils.cc | 3 +-
cpp/velox/memory/VeloxMemoryManager.cc | 17 ++-
cpp/velox/memory/VeloxMemoryManager.h | 8 +-
cpp/velox/tests/CMakeLists.txt | 14 +--
cpp/velox/tests/FunctionTest.cc | 19 ++-
cpp/velox/tests/MemoryManagerTest.cc | 129 +++++++++++++++++++++
.../tests/Substrait2VeloxPlanConversionTest.cc | 3 +-
.../tests/Substrait2VeloxPlanValidatorTest.cc | 3 -
.../tests/VeloxColumnarBatchSerializerTest.cc | 10 +-
cpp/velox/tests/VeloxColumnarBatchTest.cc | 7 +-
cpp/velox/tests/VeloxColumnarToRowTest.cc | 6 +-
cpp/velox/tests/VeloxRowToColumnarTest.cc | 3 -
.../gluten/memory/nmm/NativeMemoryManager.java | 13 +--
19 files changed, 261 insertions(+), 144 deletions(-)
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 16a18f6be..a039537b7 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -42,6 +42,9 @@ const std::string kSparkOffHeapMemory =
"spark.gluten.memory.offHeap.size.in.byt
const std::string kSparkTaskOffHeapMemory =
"spark.gluten.memory.task.offHeap.size.in.bytes";
+const std::string kMemoryReservationBlockSize =
"spark.gluten.memory.reservationBlockSize";
+const uint64_t kMemoryReservationBlockSizeDefault = 8 << 20;
+
const std::string kSparkBatchSize = "spark.gluten.sql.columnar.maxBatchSize";
const std::string kParquetBlockSize = "parquet.block.size";
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index aa3b2b884..5858a70e9 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -322,13 +322,8 @@ static inline gluten::CompressionMode
getCompressionMode(JNIEnv* env, jstring co
class SparkAllocationListener final : public gluten::AllocationListener {
public:
- SparkAllocationListener(
- JavaVM* vm,
- jobject jListenerLocalRef,
- jmethodID jReserveMethod,
- jmethodID jUnreserveMethod,
- int64_t blockSize)
- : vm_(vm), jReserveMethod_(jReserveMethod),
jUnreserveMethod_(jUnreserveMethod), blockSize_(blockSize) {
+ SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID
jReserveMethod, jmethodID jUnreserveMethod)
+ : vm_(vm), jReserveMethod_(jReserveMethod),
jUnreserveMethod_(jUnreserveMethod) {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jListenerGlobalRef_ = env->NewGlobalRef(jListenerLocalRef);
@@ -350,7 +345,20 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
}
void allocationChanged(int64_t size) override {
- updateReservation(size);
+ if (size == 0) {
+ return;
+ }
+ JNIEnv* env;
+ attachCurrentThreadAsDaemonOrThrow(vm_, &env);
+ if (size < 0) {
+ env->CallLongMethod(jListenerGlobalRef_, jUnreserveMethod_, -size);
+ checkException(env);
+ } else {
+ env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
+ checkException(env);
+ }
+ bytesReserved_ += size;
+ maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_);
}
int64_t currentBytes() override {
@@ -362,47 +370,12 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
}
private:
- int64_t reserve(int64_t diff) {
- std::lock_guard<std::mutex> lock(mutex_);
- bytesReserved_ += diff;
- int64_t newBlockCount;
- if (bytesReserved_ == 0) {
- newBlockCount = 0;
- } else {
- // ceil to get the required block number
- newBlockCount = (bytesReserved_ - 1) / blockSize_ + 1;
- }
- int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_;
- blocksReserved_ = newBlockCount;
- maxBytesReserved_ = std::max(maxBytesReserved_, bytesReserved_);
- return bytesGranted;
- }
-
- void updateReservation(int64_t diff) {
- int64_t granted = reserve(diff);
- if (granted == 0) {
- return;
- }
- JNIEnv* env;
- attachCurrentThreadAsDaemonOrThrow(vm_, &env);
- if (granted < 0) {
- env->CallLongMethod(jListenerGlobalRef_, jUnreserveMethod_, -granted);
- checkException(env);
- } else {
- env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, granted);
- checkException(env);
- }
- }
-
JavaVM* vm_;
jobject jListenerGlobalRef_;
jmethodID jReserveMethod_;
jmethodID jUnreserveMethod_;
- int64_t blockSize_;
- int64_t blocksReserved_ = 0L;
int64_t bytesReserved_ = 0L;
int64_t maxBytesReserved_ = 0L;
- std::mutex mutex_;
};
class BacktraceAllocationListener final : public gluten::AllocationListener {
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index e70a017e0..6a1926317 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1309,7 +1309,6 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_memory_nmm_NativeMemoryManager_cr
jstring jbackendType,
jstring jnmmName,
jlong allocatorId,
- jlong reservationBlockSize,
jobject jlistener) {
JNI_METHOD_START
JavaVM* vm;
@@ -1321,8 +1320,8 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_memory_nmm_NativeMemoryManager_cr
throw gluten::GlutenException("Allocator does not exist or has been
closed");
}
- std::unique_ptr<AllocationListener> listener =
std::make_unique<SparkAllocationListener>(
- vm, jlistener, reserveMemoryMethod, unreserveMemoryMethod,
reservationBlockSize);
+ std::unique_ptr<AllocationListener> listener =
+ std::make_unique<SparkAllocationListener>(vm, jlistener,
reserveMemoryMethod, unreserveMemoryMethod);
if (gluten::backtrace_allocation) {
listener =
std::make_unique<BacktraceAllocationListener>(std::move(listener));
diff --git a/cpp/core/memory/AllocationListener.h
b/cpp/core/memory/AllocationListener.h
index 23015e1a0..a3c0a72cb 100644
--- a/cpp/core/memory/AllocationListener.h
+++ b/cpp/core/memory/AllocationListener.h
@@ -17,6 +17,7 @@
#pragma once
+#include <math.h>
#include <memory>
namespace gluten {
@@ -44,4 +45,46 @@ class AllocationListener {
AllocationListener() = default;
};
+/// Memory changes will be round to specified block size which aim to decrease
delegated listener calls.
+class BlockAllocationListener final : public AllocationListener {
+ public:
+ BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
+ : delegated_(delegated), blockSize_(blockSize) {}
+
+ void allocationChanged(int64_t diff) override {
+ if (diff == 0) {
+ return;
+ }
+ if (diff > 0) {
+ if (reservationBytes_ - usedBytes_ < diff) {
+ auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
+ delegated_->allocationChanged(roundSize);
+ reservationBytes_ += roundSize;
+ peakBytes_ = std::max(peakBytes_, reservationBytes_);
+ }
+ usedBytes_ += diff;
+ } else {
+ usedBytes_ += diff;
+ auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ *
blockSize_;
+ delegated_->allocationChanged(-unreservedSize);
+ reservationBytes_ -= unreservedSize;
+ }
+ }
+
+ int64_t currentBytes() override {
+ return reservationBytes_;
+ }
+
+ int64_t peakBytes() override {
+ return peakBytes_;
+ }
+
+ private:
+ AllocationListener* delegated_;
+ uint64_t blockSize_{0L};
+ uint64_t usedBytes_{0L};
+ uint64_t peakBytes_{0L};
+ uint64_t reservationBytes_{0L};
+};
+
} // namespace gluten
diff --git a/cpp/core/memory/MemoryAllocator.cc
b/cpp/core/memory/MemoryAllocator.cc
index 6bcb9926e..ac869219d 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -22,54 +22,38 @@
namespace gluten {
bool ListenableMemoryAllocator::allocate(int64_t size, void** out) {
- listener_->allocationChanged(size);
+ updateUsage(size);
bool succeed = delegated_->allocate(size, out);
if (!succeed) {
- listener_->allocationChanged(-size);
- }
- if (succeed) {
- bytes_ += size;
- peakBytes_ = std::max(peakBytes_, bytes_.load());
+ updateUsage(-size);
}
return succeed;
}
bool ListenableMemoryAllocator::allocateZeroFilled(int64_t nmemb, int64_t
size, void** out) {
- listener_->allocationChanged(size * nmemb);
+ updateUsage(size * nmemb);
bool succeed = delegated_->allocateZeroFilled(nmemb, size, out);
if (!succeed) {
- listener_->allocationChanged(-size * nmemb);
- }
- if (succeed) {
- bytes_ += size * nmemb;
- peakBytes_ = std::max(peakBytes_, bytes_.load());
+ updateUsage(-size * nmemb);
}
return succeed;
}
bool ListenableMemoryAllocator::allocateAligned(uint64_t alignment, int64_t
size, void** out) {
- listener_->allocationChanged(size);
+ updateUsage(size);
bool succeed = delegated_->allocateAligned(alignment, size, out);
if (!succeed) {
- listener_->allocationChanged(-size);
- }
- if (succeed) {
- bytes_ += size;
- peakBytes_ = std::max(peakBytes_, bytes_.load());
+ updateUsage(-size);
}
return succeed;
}
bool ListenableMemoryAllocator::reallocate(void* p, int64_t size, int64_t
newSize, void** out) {
int64_t diff = newSize - size;
- listener_->allocationChanged(diff);
+ updateUsage(diff);
bool succeed = delegated_->reallocate(p, size, newSize, out);
if (!succeed) {
- listener_->allocationChanged(-diff);
- }
- if (succeed) {
- bytes_ += diff;
- peakBytes_ = std::max(peakBytes_, bytes_.load());
+ updateUsage(-diff);
}
return succeed;
}
@@ -81,38 +65,37 @@ bool ListenableMemoryAllocator::reallocateAligned(
int64_t newSize,
void** out) {
int64_t diff = newSize - size;
- listener_->allocationChanged(diff);
+ updateUsage(diff);
bool succeed = delegated_->reallocateAligned(p, alignment, size, newSize,
out);
if (!succeed) {
- listener_->allocationChanged(-diff);
- }
- if (succeed) {
- bytes_ += diff;
- peakBytes_ = std::max(peakBytes_, bytes_.load());
+ updateUsage(-diff);
}
return succeed;
}
bool ListenableMemoryAllocator::free(void* p, int64_t size) {
- listener_->allocationChanged(-size);
+ updateUsage(-size);
bool succeed = delegated_->free(p, size);
if (!succeed) {
- listener_->allocationChanged(size);
- }
- if (succeed) {
- bytes_ -= size;
+ updateUsage(size);
}
return succeed;
}
int64_t ListenableMemoryAllocator::getBytes() const {
- return bytes_;
+ return usedBytes_;
}
int64_t ListenableMemoryAllocator::peakBytes() const {
return peakBytes_;
}
+void ListenableMemoryAllocator::updateUsage(int64_t size) {
+ listener_->allocationChanged(size);
+ usedBytes_ += size;
+ peakBytes_ = std::max(peakBytes_, usedBytes_);
+}
+
bool StdMemoryAllocator::allocate(int64_t size, void** out) {
*out = std::malloc(size);
bytes_ += size;
diff --git a/cpp/core/memory/MemoryAllocator.h
b/cpp/core/memory/MemoryAllocator.h
index a322c9190..bc8f9de18 100644
--- a/cpp/core/memory/MemoryAllocator.h
+++ b/cpp/core/memory/MemoryAllocator.h
@@ -68,10 +68,11 @@ class ListenableMemoryAllocator final : public
MemoryAllocator {
int64_t peakBytes() const override;
private:
+ void updateUsage(int64_t size);
MemoryAllocator* delegated_;
AllocationListener* listener_;
- std::atomic_int64_t bytes_{0};
- int64_t peakBytes_{0};
+ uint64_t usedBytes_{0L};
+ uint64_t peakBytes_{0L};
};
class StdMemoryAllocator final : public MemoryAllocator {
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index ccec6f3c4..a9f6f0838 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -18,7 +18,7 @@
#include "BenchmarkUtils.h"
#include "compute/VeloxBackend.h"
#include "compute/VeloxRuntime.h"
-#include "config/GlutenConfig.h"
+#include "config/VeloxConfig.h"
#include "shuffle/Utils.h"
#include "utils/StringUtil.h"
#include "velox/dwio/common/Options.h"
@@ -38,6 +38,7 @@ std::unordered_map<std::string, std::string> bmConfMap =
{{gluten::kSparkBatchSi
} // namespace
void initVeloxBackend(std::unordered_map<std::string, std::string>& conf) {
+ conf[gluten::kGlogSeverityLevel] = "0";
gluten::VeloxBackend::create(conf);
}
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index f49beaccd..0584780ad 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -20,6 +20,8 @@
#include "velox/common/memory/MemoryPool.h"
#include "velox/exec/MemoryReclaimer.h"
+#include "compute/VeloxBackend.h"
+#include "config/VeloxConfig.h"
#include "memory/ArrowMemoryPool.h"
#include "utils/exception.h"
@@ -103,9 +105,9 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
}
}
auto reclaimedFreeBytes = pool->shrink(0);
- auto neededBytes = bytes - reclaimedFreeBytes;
+ auto neededBytes = velox::bits::roundUp(bytes - reclaimedFreeBytes,
memoryPoolTransferCapacity_);
listener_->allocationChanged(neededBytes);
- auto ret = pool->grow(bytes, bytes);
+ auto ret = pool->grow(reclaimedFreeBytes + neededBytes, bytes);
VELOX_CHECK(
ret,
"{} failed to grow {} bytes, current state {}",
@@ -156,8 +158,11 @@ VeloxMemoryManager::VeloxMemoryManager(
std::shared_ptr<MemoryAllocator> allocator,
std::unique_ptr<AllocationListener> listener)
: MemoryManager(), name_(name), listener_(std::move(listener)) {
- glutenAlloc_ = std::make_unique<ListenableMemoryAllocator>(allocator.get(),
listener_.get());
- arrowPool_ = std::make_unique<ArrowMemoryPool>(glutenAlloc_.get());
+ auto reservationBlockSize =
VeloxBackend::get()->getBackendConf()->get<uint64_t>(
+ kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
+ blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(),
reservationBlockSize);
+ listenableAlloc_ =
std::make_unique<ListenableMemoryAllocator>(allocator.get(),
blockListener_.get());
+ arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
ArbitratorFactoryRegister afr(listener_.get());
velox::memory::MemoryManagerOptions mmOptions{
@@ -169,7 +174,7 @@ VeloxMemoryManager::VeloxMemoryManager(
.allocatorCapacity = velox::memory::kMaxMemory,
.arbitratorKind = afr.getKind(),
.memoryPoolInitCapacity = 0,
- .memoryPoolTransferCapacity = 32 << 20,
+ .memoryPoolTransferCapacity = reservationBlockSize,
.memoryReclaimWaitMs = 0};
veloxMemoryManager_ =
std::make_unique<velox::memory::MemoryManager>(mmOptions);
@@ -222,7 +227,7 @@ const MemoryUsageStats
VeloxMemoryManager::collectMemoryUsageStats() const {
stats.set_current(listener_->currentBytes());
stats.set_peak(listener_->peakBytes());
stats.mutable_children()->emplace(
- "gluten::MemoryAllocator",
collectGlutenAllocatorMemoryUsageStats(glutenAlloc_.get()));
+ "gluten::MemoryAllocator",
collectGlutenAllocatorMemoryUsageStats(listenableAlloc_.get()));
stats.mutable_children()->emplace(
veloxAggregatePool_->name(),
collectVeloxMemoryUsageStats(veloxAggregatePool_.get()));
return stats;
diff --git a/cpp/velox/memory/VeloxMemoryManager.h
b/cpp/velox/memory/VeloxMemoryManager.h
index 1e8bcd8c8..3ba5bbf7d 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -60,6 +60,11 @@ class VeloxMemoryManager final : public MemoryManager {
void hold() override;
+ /// Test only
+ MemoryAllocator* allocator() const {
+ return listenableAlloc_.get();
+ }
+
AllocationListener* getListener() const {
return listener_.get();
}
@@ -74,8 +79,9 @@ class VeloxMemoryManager final : public MemoryManager {
#endif
// This is a listenable allocator used for arrow.
- std::unique_ptr<MemoryAllocator> glutenAlloc_;
+ std::unique_ptr<MemoryAllocator> listenableAlloc_;
std::unique_ptr<AllocationListener> listener_;
+ std::unique_ptr<AllocationListener> blockListener_;
std::unique_ptr<arrow::MemoryPool> arrowPool_;
std::unique_ptr<facebook::velox::memory::MemoryManager> veloxMemoryManager_;
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index a5bd5b4f7..29beb69da 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -30,12 +30,14 @@ function(add_velox_test TEST_EXEC)
else()
message(FATAL_ERROR "No sources specified for test ${TEST_NAME}")
endif()
- add_executable(${TEST_EXEC} ${SOURCES})
+ add_executable(${TEST_EXEC} ${SOURCES} ${VELOX_TEST_COMMON_SRCS})
target_include_directories(${TEST_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox
${CMAKE_SOURCE_DIR}/src ${VELOX_BUILD_PATH}/_deps/duckdb-src/src/include)
- target_link_libraries(${TEST_EXEC} velox GTest::gtest GTest::gtest_main
google::glog benchmark::benchmark)
+ target_link_libraries(${TEST_EXEC} velox_benchmark_common GTest::gtest
GTest::gtest_main)
gtest_discover_tests(${TEST_EXEC} DISCOVERY_MODE PRE_TEST)
endfunction()
+set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc FilePathGenerator.cc)
+
add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
# TODO: ORC is not well supported.
# add_velox_test(orc_test SOURCES OrcTest.cc)
@@ -55,10 +57,8 @@ add_velox_test(
SubstraitExtensionCollectorTest.cc
VeloxSubstraitRoundTripTest.cc
VeloxSubstraitSignatureTest.cc
- VeloxToSubstraitTypeTest.cc
- FunctionTest.cc
- JsonToProtoConverter.cc
- FilePathGenerator.cc)
-add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc)
+ VeloxToSubstraitTypeTest.cc)
+add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc
FunctionTest.cc)
add_velox_test(execution_ctx_test SOURCES RuntimeTest.cc)
+add_velox_test(velox_memory_test SOURCES MemoryManagerTest.cc)
add_velox_test(buffer_outputstream_test SOURCES BufferOutputStreamTest.cc)
diff --git a/cpp/velox/tests/FunctionTest.cc b/cpp/velox/tests/FunctionTest.cc
index 01af30176..b55b64ba9 100644
--- a/cpp/velox/tests/FunctionTest.cc
+++ b/cpp/velox/tests/FunctionTest.cc
@@ -15,34 +15,33 @@
* limitations under the License.
*/
+#include "FilePathGenerator.h"
#include "JsonToProtoConverter.h"
-#include "memory/VeloxMemoryManager.h"
#include "velox/common/base/Fs.h"
#include "velox/common/base/tests/GTestUtils.h"
+#include "velox/core/QueryCtx.h"
#include "velox/dwio/common/tests/utils/DataFiles.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+#include "substrait/SubstraitParser.h"
#include "substrait/SubstraitToVeloxPlan.h"
#include "substrait/TypeUtils.h"
#include "substrait/VariantToVectorConverter.h"
#include "substrait/VeloxToSubstraitType.h"
-#include "FilePathGenerator.h"
-
-#include "velox/core/QueryCtx.h"
-
-#include "substrait/SubstraitParser.h"
-
using namespace facebook::velox;
using namespace facebook::velox::test;
namespace gluten {
-class FunctionTest : public ::testing::Test {
+class FunctionTest : public ::testing::Test, public test::VectorTestBase {
protected:
- std::shared_ptr<memory::MemoryPool> pool_ =
gluten::defaultLeafVeloxMemoryPool();
+ static void SetUpTestCase() {
+ memory::MemoryManager::testingSetInstance({});
+ }
std::shared_ptr<gluten::SubstraitToVeloxPlanConverter> planConverter_ =
- std::make_shared<gluten::SubstraitToVeloxPlanConverter>(pool_.get());
+ std::make_shared<gluten::SubstraitToVeloxPlanConverter>(pool());
};
TEST_F(FunctionTest, makeNames) {
diff --git a/cpp/velox/tests/MemoryManagerTest.cc
b/cpp/velox/tests/MemoryManagerTest.cc
new file mode 100644
index 000000000..f256db1b2
--- /dev/null
+++ b/cpp/velox/tests/MemoryManagerTest.cc
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "benchmarks/common/BenchmarkUtils.h"
+#include "compute/VeloxBackend.h"
+#include "config/GlutenConfig.h"
+#include "memory/VeloxMemoryManager.h"
+#include "velox/common/base/tests/GTestUtils.h"
+
+namespace gluten {
+using namespace facebook::velox;
+
+class MockAllocationListener : public gluten::AllocationListener {
+ public:
+ void allocationChanged(int64_t diff) override {
+ currentBytes_ += diff;
+ peakBytes_ = std::max(peakBytes_, currentBytes_);
+ }
+ int64_t currentBytes() override {
+ return currentBytes_;
+ }
+ int64_t peakBytes() override {
+ return peakBytes_;
+ }
+ uint64_t currentBytes_{0L};
+ uint64_t peakBytes_{0L};
+};
+
+namespace {
+static const uint64_t kMB = 1 << 20;
+} // namespace
+
+class MemoryManagerTest : public ::testing::Test {
+ protected:
+ static void SetUpTestCase() {
+ std::unordered_map<std::string, std::string> conf = {
+ {kMemoryReservationBlockSize,
std::to_string(kMemoryReservationBlockSizeDefault)}};
+ initVeloxBackend(conf);
+ }
+
+ void SetUp() override {
+ vmm_ = std::make_unique<VeloxMemoryManager>("test", stdAllocator_,
std::make_unique<MockAllocationListener>());
+ listener_ = vmm_->getListener();
+ allocator_ = vmm_->allocator();
+ }
+
+ std::unique_ptr<VeloxMemoryManager> vmm_;
+ AllocationListener* listener_;
+ MemoryAllocator* allocator_;
+
+ std::shared_ptr<MemoryAllocator> stdAllocator_ =
std::make_shared<StdMemoryAllocator>();
+
+ struct Allocation {
+ void* buffer;
+ size_t size;
+ memory::MemoryPool* pool;
+ };
+};
+
+TEST_F(MemoryManagerTest, memoryPoolWithBlockReseravtion) {
+ auto pool = vmm_->getLeafMemoryPool();
+ std::vector<Allocation> allocations;
+ std::vector<uint64_t> sizes{
+ kMemoryReservationBlockSizeDefault - 1 * kMB,
kMemoryReservationBlockSizeDefault - 2 * kMB};
+ for (const auto& size : sizes) {
+ auto buf = pool->allocate(size);
+ allocations.push_back({buf, size, pool.get()});
+ }
+ EXPECT_EQ(listener_->currentBytes(), 2 * kMemoryReservationBlockSizeDefault);
+ EXPECT_EQ(listener_->peakBytes(), listener_->currentBytes());
+
+ for (auto& allocation : allocations) {
+ allocation.pool->free(allocation.buffer, allocation.size);
+ }
+
+ auto currentBytes = listener_->currentBytes();
+ ASSERT_EQ(vmm_->shrink(0), currentBytes);
+ ASSERT_EQ(listener_->currentBytes(), 0);
+}
+
+TEST_F(MemoryManagerTest, memoryAllocatorWithBlockReservation) {
+ std::vector<Allocation> allocations;
+ std::vector<uint64_t> sizes{
+ kMemoryReservationBlockSizeDefault - 1 * kMB,
kMemoryReservationBlockSizeDefault - 2 * kMB};
+ for (auto i = 0; i < sizes.size(); i++) {
+ auto size = sizes[i];
+ auto currentBytes = allocator_->getBytes();
+ Allocation allocation{.size = size};
+ allocator_->allocate(size, &allocation.buffer);
+ allocations.push_back(allocation);
+
+ EXPECT_EQ(allocator_->getBytes(), currentBytes + size);
+ EXPECT_EQ(allocator_->peakBytes(), allocator_->getBytes());
+ EXPECT_EQ(listener_->currentBytes(), (i + 1) *
kMemoryReservationBlockSizeDefault);
+ EXPECT_EQ(listener_->peakBytes(), listener_->currentBytes());
+ }
+
+ auto currentBytes = allocator_->getBytes();
+ auto allocation = allocations.back();
+ allocations.pop_back();
+ allocator_->free(allocation.buffer, allocation.size);
+ EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
+ EXPECT_EQ(listener_->currentBytes(), kMemoryReservationBlockSizeDefault);
+
+ currentBytes = allocator_->getBytes();
+ allocation = allocations.back();
+ allocations.pop_back();
+ allocator_->free(allocation.buffer, allocation.size);
+ EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
+ EXPECT_EQ(listener_->currentBytes(), 0);
+
+ ASSERT_EQ(allocator_->getBytes(), 0);
+}
+
+} // namespace gluten
diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
index 841514261..3926b22c9 100644
--- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
@@ -19,7 +19,6 @@
#include <filesystem>
#include "compute/VeloxPlanConverter.h"
-#include "memory/VeloxMemoryManager.h"
#include "substrait/SubstraitToVeloxPlan.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/dwio/common/tests/utils/DataFiles.h"
@@ -72,7 +71,7 @@ class Substrait2VeloxPlanConversionTest : public
exec::test::HiveConnectorTestBa
std::shared_ptr<exec::test::TempDirectoryPath>
tmpDir_{exec::test::TempDirectoryPath::create()};
std::shared_ptr<VeloxPlanConverter> planConverter_ =
std::make_shared<VeloxPlanConverter>(
std::vector<std::shared_ptr<ResultIterator>>(),
- gluten::defaultLeafVeloxMemoryPool().get(),
+ pool(),
std::unordered_map<std::string, std::string>());
};
diff --git a/cpp/velox/tests/Substrait2VeloxPlanValidatorTest.cc
b/cpp/velox/tests/Substrait2VeloxPlanValidatorTest.cc
index 8d605e308..d5eafa1e2 100644
--- a/cpp/velox/tests/Substrait2VeloxPlanValidatorTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxPlanValidatorTest.cc
@@ -54,9 +54,6 @@ class Substrait2VeloxPlanValidatorTest : public
exec::test::HiveConnectorTestBas
auto planValidator =
std::make_shared<SubstraitToVeloxPlanValidator>(pool_.get(), execCtx.get());
return planValidator->validate(plan);
}
-
- private:
- std::shared_ptr<memory::MemoryPool>
memoryPool_{gluten::defaultLeafVeloxMemoryPool()};
};
TEST_F(Substrait2VeloxPlanValidatorTest, group) {
diff --git a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
index 4f1745f81..ffa6f032a 100644
--- a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
+++ b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
@@ -29,14 +29,12 @@ using namespace facebook::velox;
namespace gluten {
class VeloxColumnarBatchSerializerTest : public ::testing::Test, public
test::VectorTestBase {
- protected:
- std::shared_ptr<arrow::MemoryPool> arrowPool_ = defaultArrowMemoryPool();
- std::shared_ptr<memory::MemoryPool> veloxPool_ =
defaultLeafVeloxMemoryPool();
- // velox requires the mem manager to be instanced
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
}
+
+ std::shared_ptr<arrow::MemoryPool> arrowPool_ = defaultArrowMemoryPool();
};
TEST_F(VeloxColumnarBatchSerializerTest, serialize) {
@@ -54,12 +52,12 @@ TEST_F(VeloxColumnarBatchSerializerTest, serialize) {
};
auto vector = makeRowVector(children);
auto batch = std::make_shared<VeloxColumnarBatch>(vector);
- auto serializer =
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), veloxPool_,
nullptr);
+ auto serializer =
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), pool_,
nullptr);
auto buffer = serializer->serializeColumnarBatches({batch});
ArrowSchema cSchema;
exportToArrow(vector, cSchema, ArrowUtils::getBridgeOptions());
- auto deserializer =
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), veloxPool_,
&cSchema);
+ auto deserializer =
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), pool_,
&cSchema);
auto deserialized =
deserializer->deserialize(const_cast<uint8_t*>(buffer->data()), buffer->size());
auto deserializedVector =
std::dynamic_pointer_cast<VeloxColumnarBatch>(deserialized)->getRowVector();
test::assertEqualVectors(vector, deserializedVector);
diff --git a/cpp/velox/tests/VeloxColumnarBatchTest.cc
b/cpp/velox/tests/VeloxColumnarBatchTest.cc
index 559f9f047..ba66afb40 100644
--- a/cpp/velox/tests/VeloxColumnarBatchTest.cc
+++ b/cpp/velox/tests/VeloxColumnarBatchTest.cc
@@ -24,12 +24,9 @@ using namespace facebook::velox;
namespace gluten {
class VeloxColumnarBatchTest : public ::testing::Test, public
test::VectorTestBase {
protected:
- // Velox requires the mem manager to be instanced.
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
}
-
- std::shared_ptr<memory::MemoryPool> veloxPool_ =
defaultLeafVeloxMemoryPool();
};
TEST_F(VeloxColumnarBatchTest, flattenTruncatedVector) {
@@ -43,7 +40,7 @@ TEST_F(VeloxColumnarBatchTest, flattenTruncatedVector) {
// First, make a row vector with the mapKeys and mapValues as children.
// Make the row vector size less than the children size.
auto input = std::make_shared<RowVector>(
- veloxPool_.get(),
+ pool(),
ROW({INTEGER(), BIGINT(), MAP(INTEGER(), BIGINT())}),
nullptr,
inputSize,
@@ -54,7 +51,7 @@ TEST_F(VeloxColumnarBatchTest, flattenTruncatedVector) {
// Allocate a dummy indices and wrap the original mapVector with it as a
dictionary, to force it get decoded in
// flattenVector.
- auto indices = allocateIndices(childSize, veloxPool_.get());
+ auto indices = allocateIndices(childSize, pool());
auto* rawIndices = indices->asMutable<vector_size_t>();
for (vector_size_t i = 0; i < childSize; i++) {
rawIndices[i] = i;
diff --git a/cpp/velox/tests/VeloxColumnarToRowTest.cc
b/cpp/velox/tests/VeloxColumnarToRowTest.cc
index 1769fa7ab..2309e6e1c 100644
--- a/cpp/velox/tests/VeloxColumnarToRowTest.cc
+++ b/cpp/velox/tests/VeloxColumnarToRowTest.cc
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-#include "jni/JniError.h"
#include "memory/VeloxColumnarBatch.h"
#include "memory/VeloxMemoryManager.h"
#include "operators/serializer/VeloxColumnarToRowConverter.h"
@@ -35,7 +34,7 @@ class VeloxColumnarToRowTest : public ::testing::Test, public
test::VectorTestBa
}
void testRowBufferAddr(velox::RowVectorPtr vector, uint8_t* expectArr,
int32_t expectArrSize) {
- auto columnarToRowConverter =
std::make_shared<VeloxColumnarToRowConverter>(veloxPool_);
+ auto columnarToRowConverter =
std::make_shared<VeloxColumnarToRowConverter>(pool_);
auto cb = std::make_shared<VeloxColumnarBatch>(vector);
columnarToRowConverter->convert(cb);
@@ -45,9 +44,6 @@ class VeloxColumnarToRowTest : public ::testing::Test, public
test::VectorTestBa
ASSERT_EQ(*(address + i), *(expectArr + i));
}
}
-
- private:
- std::shared_ptr<velox::memory::MemoryPool> veloxPool_ =
defaultLeafVeloxMemoryPool();
};
TEST_F(VeloxColumnarToRowTest, Buffer_int8_int16) {
diff --git a/cpp/velox/tests/VeloxRowToColumnarTest.cc
b/cpp/velox/tests/VeloxRowToColumnarTest.cc
index 62f809ac2..93f780ca3 100644
--- a/cpp/velox/tests/VeloxRowToColumnarTest.cc
+++ b/cpp/velox/tests/VeloxRowToColumnarTest.cc
@@ -55,9 +55,6 @@ class VeloxRowToColumnarTest : public ::testing::Test, public
test::VectorTestBa
auto vp =
std::dynamic_pointer_cast<VeloxColumnarBatch>(cb)->getRowVector();
velox::test::assertEqualVectors(vector, vp);
}
-
- private:
- std::shared_ptr<arrow::MemoryPool> arrowPool_ = defaultArrowMemoryPool();
};
TEST_F(VeloxRowToColumnarTest, allTypes) {
diff --git
a/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManager.java
b/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManager.java
index 230a7342e..0d1a0c5ae 100644
---
a/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManager.java
+++
b/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManager.java
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.memory.nmm;
-import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.memory.alloc.NativeMemoryAllocators;
import org.apache.gluten.memory.memtarget.KnownNameAndStats;
@@ -46,12 +45,8 @@ public class NativeMemoryManager implements TaskResource {
public static NativeMemoryManager create(String name, ReservationListener
listener) {
long allocatorId =
NativeMemoryAllocators.getDefault().get().getNativeInstanceId();
- long reservationBlockSize =
GlutenConfig.getConf().memoryReservationBlockSize();
return new NativeMemoryManager(
- name,
- create(
- BackendsApiManager.getBackendName(), name, allocatorId,
reservationBlockSize, listener),
- listener);
+ name, create(BackendsApiManager.getBackendName(), name, allocatorId,
listener), listener);
}
public long getNativeInstanceHandle() {
@@ -81,11 +76,7 @@ public class NativeMemoryManager implements TaskResource {
private static native long shrink(long memoryManagerId, long size);
private static native long create(
- String backendType,
- String name,
- long allocatorId,
- long reservationBlockSize,
- ReservationListener listener);
+ String backendType, String name, long allocatorId, ReservationListener
listener);
private static native void release(long memoryManagerId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]