This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a951ba78 [VL] Move memory reservation block computation logic into 
AllocationListener
4a951ba78 is described below

commit 4a951ba78f07926cb1a85429fa43584eba38be44
Author: Yang Zhang <[email protected]>
AuthorDate: Mon May 20 19:23:38 2024 +0800

    [VL] Move memory reservation block computation logic into AllocationListener
---
 cpp/core/config/GlutenConfig.h                     |   3 +
 cpp/core/jni/JniCommon.h                           |  59 +++-------
 cpp/core/jni/JniWrapper.cc                         |   5 +-
 cpp/core/memory/AllocationListener.h               |  43 +++++++
 cpp/core/memory/MemoryAllocator.cc                 |  55 +++------
 cpp/core/memory/MemoryAllocator.h                  |   5 +-
 cpp/velox/benchmarks/common/BenchmarkUtils.cc      |   3 +-
 cpp/velox/memory/VeloxMemoryManager.cc             |  17 ++-
 cpp/velox/memory/VeloxMemoryManager.h              |   8 +-
 cpp/velox/tests/CMakeLists.txt                     |  14 +--
 cpp/velox/tests/FunctionTest.cc                    |  19 ++-
 cpp/velox/tests/MemoryManagerTest.cc               | 129 +++++++++++++++++++++
 .../tests/Substrait2VeloxPlanConversionTest.cc     |   3 +-
 .../tests/Substrait2VeloxPlanValidatorTest.cc      |   3 -
 .../tests/VeloxColumnarBatchSerializerTest.cc      |  10 +-
 cpp/velox/tests/VeloxColumnarBatchTest.cc          |   7 +-
 cpp/velox/tests/VeloxColumnarToRowTest.cc          |   6 +-
 cpp/velox/tests/VeloxRowToColumnarTest.cc          |   3 -
 .../gluten/memory/nmm/NativeMemoryManager.java     |  13 +--
 19 files changed, 261 insertions(+), 144 deletions(-)

diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 16a18f6be..a039537b7 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -42,6 +42,9 @@ const std::string kSparkOffHeapMemory = 
"spark.gluten.memory.offHeap.size.in.byt
 
 const std::string kSparkTaskOffHeapMemory = 
"spark.gluten.memory.task.offHeap.size.in.bytes";
 
+const std::string kMemoryReservationBlockSize = 
"spark.gluten.memory.reservationBlockSize";
+const uint64_t kMemoryReservationBlockSizeDefault = 8 << 20;
+
 const std::string kSparkBatchSize = "spark.gluten.sql.columnar.maxBatchSize";
 
 const std::string kParquetBlockSize = "parquet.block.size";
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index aa3b2b884..5858a70e9 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -322,13 +322,8 @@ static inline gluten::CompressionMode 
getCompressionMode(JNIEnv* env, jstring co
 
 class SparkAllocationListener final : public gluten::AllocationListener {
  public:
-  SparkAllocationListener(
-      JavaVM* vm,
-      jobject jListenerLocalRef,
-      jmethodID jReserveMethod,
-      jmethodID jUnreserveMethod,
-      int64_t blockSize)
-      : vm_(vm), jReserveMethod_(jReserveMethod), 
jUnreserveMethod_(jUnreserveMethod), blockSize_(blockSize) {
+  SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID 
jReserveMethod, jmethodID jUnreserveMethod)
+      : vm_(vm), jReserveMethod_(jReserveMethod), 
jUnreserveMethod_(jUnreserveMethod) {
     JNIEnv* env;
     attachCurrentThreadAsDaemonOrThrow(vm_, &env);
     jListenerGlobalRef_ = env->NewGlobalRef(jListenerLocalRef);
@@ -350,7 +345,20 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
   }
 
   void allocationChanged(int64_t size) override {
-    updateReservation(size);
+    if (size == 0) {
+      return;
+    }
+    JNIEnv* env;
+    attachCurrentThreadAsDaemonOrThrow(vm_, &env);
+    if (size < 0) {
+      env->CallLongMethod(jListenerGlobalRef_, jUnreserveMethod_, -size);
+      checkException(env);
+    } else {
+      env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
+      checkException(env);
+    }
+    bytesReserved_ += size;
+    maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_);
   }
 
   int64_t currentBytes() override {
@@ -362,47 +370,12 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
   }
 
  private:
-  int64_t reserve(int64_t diff) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    bytesReserved_ += diff;
-    int64_t newBlockCount;
-    if (bytesReserved_ == 0) {
-      newBlockCount = 0;
-    } else {
-      // ceil to get the required block number
-      newBlockCount = (bytesReserved_ - 1) / blockSize_ + 1;
-    }
-    int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_;
-    blocksReserved_ = newBlockCount;
-    maxBytesReserved_ = std::max(maxBytesReserved_, bytesReserved_);
-    return bytesGranted;
-  }
-
-  void updateReservation(int64_t diff) {
-    int64_t granted = reserve(diff);
-    if (granted == 0) {
-      return;
-    }
-    JNIEnv* env;
-    attachCurrentThreadAsDaemonOrThrow(vm_, &env);
-    if (granted < 0) {
-      env->CallLongMethod(jListenerGlobalRef_, jUnreserveMethod_, -granted);
-      checkException(env);
-    } else {
-      env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, granted);
-      checkException(env);
-    }
-  }
-
   JavaVM* vm_;
   jobject jListenerGlobalRef_;
   jmethodID jReserveMethod_;
   jmethodID jUnreserveMethod_;
-  int64_t blockSize_;
-  int64_t blocksReserved_ = 0L;
   int64_t bytesReserved_ = 0L;
   int64_t maxBytesReserved_ = 0L;
-  std::mutex mutex_;
 };
 
 class BacktraceAllocationListener final : public gluten::AllocationListener {
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index e70a017e0..6a1926317 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1309,7 +1309,6 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_memory_nmm_NativeMemoryManager_cr
     jstring jbackendType,
     jstring jnmmName,
     jlong allocatorId,
-    jlong reservationBlockSize,
     jobject jlistener) {
   JNI_METHOD_START
   JavaVM* vm;
@@ -1321,8 +1320,8 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_memory_nmm_NativeMemoryManager_cr
     throw gluten::GlutenException("Allocator does not exist or has been 
closed");
   }
 
-  std::unique_ptr<AllocationListener> listener = 
std::make_unique<SparkAllocationListener>(
-      vm, jlistener, reserveMemoryMethod, unreserveMemoryMethod, 
reservationBlockSize);
+  std::unique_ptr<AllocationListener> listener =
+      std::make_unique<SparkAllocationListener>(vm, jlistener, 
reserveMemoryMethod, unreserveMemoryMethod);
 
   if (gluten::backtrace_allocation) {
     listener = 
std::make_unique<BacktraceAllocationListener>(std::move(listener));
diff --git a/cpp/core/memory/AllocationListener.h 
b/cpp/core/memory/AllocationListener.h
index 23015e1a0..a3c0a72cb 100644
--- a/cpp/core/memory/AllocationListener.h
+++ b/cpp/core/memory/AllocationListener.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <math.h>
 #include <memory>
 
 namespace gluten {
@@ -44,4 +45,46 @@ class AllocationListener {
   AllocationListener() = default;
 };
 
+/// Memory changes will be round to specified block size which aim to decrease 
delegated listener calls.
+class BlockAllocationListener final : public AllocationListener {
+ public:
+  BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
+      : delegated_(delegated), blockSize_(blockSize) {}
+
+  void allocationChanged(int64_t diff) override {
+    if (diff == 0) {
+      return;
+    }
+    if (diff > 0) {
+      if (reservationBytes_ - usedBytes_ < diff) {
+        auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
+        delegated_->allocationChanged(roundSize);
+        reservationBytes_ += roundSize;
+        peakBytes_ = std::max(peakBytes_, reservationBytes_);
+      }
+      usedBytes_ += diff;
+    } else {
+      usedBytes_ += diff;
+      auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * 
blockSize_;
+      delegated_->allocationChanged(-unreservedSize);
+      reservationBytes_ -= unreservedSize;
+    }
+  }
+
+  int64_t currentBytes() override {
+    return reservationBytes_;
+  }
+
+  int64_t peakBytes() override {
+    return peakBytes_;
+  }
+
+ private:
+  AllocationListener* delegated_;
+  uint64_t blockSize_{0L};
+  uint64_t usedBytes_{0L};
+  uint64_t peakBytes_{0L};
+  uint64_t reservationBytes_{0L};
+};
+
 } // namespace gluten
diff --git a/cpp/core/memory/MemoryAllocator.cc 
b/cpp/core/memory/MemoryAllocator.cc
index 6bcb9926e..ac869219d 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -22,54 +22,38 @@
 namespace gluten {
 
 bool ListenableMemoryAllocator::allocate(int64_t size, void** out) {
-  listener_->allocationChanged(size);
+  updateUsage(size);
   bool succeed = delegated_->allocate(size, out);
   if (!succeed) {
-    listener_->allocationChanged(-size);
-  }
-  if (succeed) {
-    bytes_ += size;
-    peakBytes_ = std::max(peakBytes_, bytes_.load());
+    updateUsage(-size);
   }
   return succeed;
 }
 
 bool ListenableMemoryAllocator::allocateZeroFilled(int64_t nmemb, int64_t 
size, void** out) {
-  listener_->allocationChanged(size * nmemb);
+  updateUsage(size * nmemb);
   bool succeed = delegated_->allocateZeroFilled(nmemb, size, out);
   if (!succeed) {
-    listener_->allocationChanged(-size * nmemb);
-  }
-  if (succeed) {
-    bytes_ += size * nmemb;
-    peakBytes_ = std::max(peakBytes_, bytes_.load());
+    updateUsage(-size * nmemb);
   }
   return succeed;
 }
 
 bool ListenableMemoryAllocator::allocateAligned(uint64_t alignment, int64_t 
size, void** out) {
-  listener_->allocationChanged(size);
+  updateUsage(size);
   bool succeed = delegated_->allocateAligned(alignment, size, out);
   if (!succeed) {
-    listener_->allocationChanged(-size);
-  }
-  if (succeed) {
-    bytes_ += size;
-    peakBytes_ = std::max(peakBytes_, bytes_.load());
+    updateUsage(-size);
   }
   return succeed;
 }
 
 bool ListenableMemoryAllocator::reallocate(void* p, int64_t size, int64_t 
newSize, void** out) {
   int64_t diff = newSize - size;
-  listener_->allocationChanged(diff);
+  updateUsage(diff);
   bool succeed = delegated_->reallocate(p, size, newSize, out);
   if (!succeed) {
-    listener_->allocationChanged(-diff);
-  }
-  if (succeed) {
-    bytes_ += diff;
-    peakBytes_ = std::max(peakBytes_, bytes_.load());
+    updateUsage(-diff);
   }
   return succeed;
 }
@@ -81,38 +65,37 @@ bool ListenableMemoryAllocator::reallocateAligned(
     int64_t newSize,
     void** out) {
   int64_t diff = newSize - size;
-  listener_->allocationChanged(diff);
+  updateUsage(diff);
   bool succeed = delegated_->reallocateAligned(p, alignment, size, newSize, 
out);
   if (!succeed) {
-    listener_->allocationChanged(-diff);
-  }
-  if (succeed) {
-    bytes_ += diff;
-    peakBytes_ = std::max(peakBytes_, bytes_.load());
+    updateUsage(-diff);
   }
   return succeed;
 }
 
 bool ListenableMemoryAllocator::free(void* p, int64_t size) {
-  listener_->allocationChanged(-size);
+  updateUsage(-size);
   bool succeed = delegated_->free(p, size);
   if (!succeed) {
-    listener_->allocationChanged(size);
-  }
-  if (succeed) {
-    bytes_ -= size;
+    updateUsage(size);
   }
   return succeed;
 }
 
 int64_t ListenableMemoryAllocator::getBytes() const {
-  return bytes_;
+  return usedBytes_;
 }
 
 int64_t ListenableMemoryAllocator::peakBytes() const {
   return peakBytes_;
 }
 
+void ListenableMemoryAllocator::updateUsage(int64_t size) {
+  listener_->allocationChanged(size);
+  usedBytes_ += size;
+  peakBytes_ = std::max(peakBytes_, usedBytes_);
+}
+
 bool StdMemoryAllocator::allocate(int64_t size, void** out) {
   *out = std::malloc(size);
   bytes_ += size;
diff --git a/cpp/core/memory/MemoryAllocator.h 
b/cpp/core/memory/MemoryAllocator.h
index a322c9190..bc8f9de18 100644
--- a/cpp/core/memory/MemoryAllocator.h
+++ b/cpp/core/memory/MemoryAllocator.h
@@ -68,10 +68,11 @@ class ListenableMemoryAllocator final : public 
MemoryAllocator {
   int64_t peakBytes() const override;
 
  private:
+  void updateUsage(int64_t size);
   MemoryAllocator* delegated_;
   AllocationListener* listener_;
-  std::atomic_int64_t bytes_{0};
-  int64_t peakBytes_{0};
+  uint64_t usedBytes_{0L};
+  uint64_t peakBytes_{0L};
 };
 
 class StdMemoryAllocator final : public MemoryAllocator {
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc 
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index ccec6f3c4..a9f6f0838 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -18,7 +18,7 @@
 #include "BenchmarkUtils.h"
 #include "compute/VeloxBackend.h"
 #include "compute/VeloxRuntime.h"
-#include "config/GlutenConfig.h"
+#include "config/VeloxConfig.h"
 #include "shuffle/Utils.h"
 #include "utils/StringUtil.h"
 #include "velox/dwio/common/Options.h"
@@ -38,6 +38,7 @@ std::unordered_map<std::string, std::string> bmConfMap = 
{{gluten::kSparkBatchSi
 } // namespace
 
 void initVeloxBackend(std::unordered_map<std::string, std::string>& conf) {
+  conf[gluten::kGlogSeverityLevel] = "0";
   gluten::VeloxBackend::create(conf);
 }
 
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index f49beaccd..0584780ad 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -20,6 +20,8 @@
 #include "velox/common/memory/MemoryPool.h"
 #include "velox/exec/MemoryReclaimer.h"
 
+#include "compute/VeloxBackend.h"
+#include "config/VeloxConfig.h"
 #include "memory/ArrowMemoryPool.h"
 #include "utils/exception.h"
 
@@ -103,9 +105,9 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
       }
     }
     auto reclaimedFreeBytes = pool->shrink(0);
-    auto neededBytes = bytes - reclaimedFreeBytes;
+    auto neededBytes = velox::bits::roundUp(bytes - reclaimedFreeBytes, 
memoryPoolTransferCapacity_);
     listener_->allocationChanged(neededBytes);
-    auto ret = pool->grow(bytes, bytes);
+    auto ret = pool->grow(reclaimedFreeBytes + neededBytes, bytes);
     VELOX_CHECK(
         ret,
         "{} failed to grow {} bytes, current state {}",
@@ -156,8 +158,11 @@ VeloxMemoryManager::VeloxMemoryManager(
     std::shared_ptr<MemoryAllocator> allocator,
     std::unique_ptr<AllocationListener> listener)
     : MemoryManager(), name_(name), listener_(std::move(listener)) {
-  glutenAlloc_ = std::make_unique<ListenableMemoryAllocator>(allocator.get(), 
listener_.get());
-  arrowPool_ = std::make_unique<ArrowMemoryPool>(glutenAlloc_.get());
+  auto reservationBlockSize = 
VeloxBackend::get()->getBackendConf()->get<uint64_t>(
+      kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
+  blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), 
reservationBlockSize);
+  listenableAlloc_ = 
std::make_unique<ListenableMemoryAllocator>(allocator.get(), 
blockListener_.get());
+  arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
 
   ArbitratorFactoryRegister afr(listener_.get());
   velox::memory::MemoryManagerOptions mmOptions{
@@ -169,7 +174,7 @@ VeloxMemoryManager::VeloxMemoryManager(
       .allocatorCapacity = velox::memory::kMaxMemory,
       .arbitratorKind = afr.getKind(),
       .memoryPoolInitCapacity = 0,
-      .memoryPoolTransferCapacity = 32 << 20,
+      .memoryPoolTransferCapacity = reservationBlockSize,
       .memoryReclaimWaitMs = 0};
   veloxMemoryManager_ = 
std::make_unique<velox::memory::MemoryManager>(mmOptions);
 
@@ -222,7 +227,7 @@ const MemoryUsageStats 
VeloxMemoryManager::collectMemoryUsageStats() const {
   stats.set_current(listener_->currentBytes());
   stats.set_peak(listener_->peakBytes());
   stats.mutable_children()->emplace(
-      "gluten::MemoryAllocator", 
collectGlutenAllocatorMemoryUsageStats(glutenAlloc_.get()));
+      "gluten::MemoryAllocator", 
collectGlutenAllocatorMemoryUsageStats(listenableAlloc_.get()));
   stats.mutable_children()->emplace(
       veloxAggregatePool_->name(), 
collectVeloxMemoryUsageStats(veloxAggregatePool_.get()));
   return stats;
diff --git a/cpp/velox/memory/VeloxMemoryManager.h 
b/cpp/velox/memory/VeloxMemoryManager.h
index 1e8bcd8c8..3ba5bbf7d 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -60,6 +60,11 @@ class VeloxMemoryManager final : public MemoryManager {
 
   void hold() override;
 
+  /// Test only
+  MemoryAllocator* allocator() const {
+    return listenableAlloc_.get();
+  }
+
   AllocationListener* getListener() const {
     return listener_.get();
   }
@@ -74,8 +79,9 @@ class VeloxMemoryManager final : public MemoryManager {
 #endif
 
   // This is a listenable allocator used for arrow.
-  std::unique_ptr<MemoryAllocator> glutenAlloc_;
+  std::unique_ptr<MemoryAllocator> listenableAlloc_;
   std::unique_ptr<AllocationListener> listener_;
+  std::unique_ptr<AllocationListener> blockListener_;
   std::unique_ptr<arrow::MemoryPool> arrowPool_;
 
   std::unique_ptr<facebook::velox::memory::MemoryManager> veloxMemoryManager_;
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index a5bd5b4f7..29beb69da 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -30,12 +30,14 @@ function(add_velox_test TEST_EXEC)
   else()
     message(FATAL_ERROR "No sources specified for test ${TEST_NAME}")
   endif()
-  add_executable(${TEST_EXEC} ${SOURCES})
+  add_executable(${TEST_EXEC} ${SOURCES} ${VELOX_TEST_COMMON_SRCS})
   target_include_directories(${TEST_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox 
${CMAKE_SOURCE_DIR}/src ${VELOX_BUILD_PATH}/_deps/duckdb-src/src/include)
-  target_link_libraries(${TEST_EXEC} velox GTest::gtest GTest::gtest_main 
google::glog benchmark::benchmark)
+  target_link_libraries(${TEST_EXEC} velox_benchmark_common GTest::gtest 
GTest::gtest_main)
   gtest_discover_tests(${TEST_EXEC} DISCOVERY_MODE PRE_TEST)
 endfunction()
 
+set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc FilePathGenerator.cc)
+
 add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
 # TODO: ORC is not well supported.
 # add_velox_test(orc_test SOURCES OrcTest.cc)
@@ -55,10 +57,8 @@ add_velox_test(
     SubstraitExtensionCollectorTest.cc
     VeloxSubstraitRoundTripTest.cc
     VeloxSubstraitSignatureTest.cc
-    VeloxToSubstraitTypeTest.cc
-    FunctionTest.cc
-    JsonToProtoConverter.cc
-    FilePathGenerator.cc)
-add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc)
+    VeloxToSubstraitTypeTest.cc)
+add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc 
FunctionTest.cc)
 add_velox_test(execution_ctx_test SOURCES RuntimeTest.cc)
+add_velox_test(velox_memory_test SOURCES MemoryManagerTest.cc)
 add_velox_test(buffer_outputstream_test SOURCES BufferOutputStreamTest.cc)
diff --git a/cpp/velox/tests/FunctionTest.cc b/cpp/velox/tests/FunctionTest.cc
index 01af30176..b55b64ba9 100644
--- a/cpp/velox/tests/FunctionTest.cc
+++ b/cpp/velox/tests/FunctionTest.cc
@@ -15,34 +15,33 @@
  * limitations under the License.
  */
 
+#include "FilePathGenerator.h"
 #include "JsonToProtoConverter.h"
 
-#include "memory/VeloxMemoryManager.h"
 #include "velox/common/base/Fs.h"
 #include "velox/common/base/tests/GTestUtils.h"
+#include "velox/core/QueryCtx.h"
 #include "velox/dwio/common/tests/utils/DataFiles.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
 
+#include "substrait/SubstraitParser.h"
 #include "substrait/SubstraitToVeloxPlan.h"
 #include "substrait/TypeUtils.h"
 #include "substrait/VariantToVectorConverter.h"
 #include "substrait/VeloxToSubstraitType.h"
 
-#include "FilePathGenerator.h"
-
-#include "velox/core/QueryCtx.h"
-
-#include "substrait/SubstraitParser.h"
-
 using namespace facebook::velox;
 using namespace facebook::velox::test;
 
 namespace gluten {
-class FunctionTest : public ::testing::Test {
+class FunctionTest : public ::testing::Test, public test::VectorTestBase {
  protected:
-  std::shared_ptr<memory::MemoryPool> pool_ = 
gluten::defaultLeafVeloxMemoryPool();
+  static void SetUpTestCase() {
+    memory::MemoryManager::testingSetInstance({});
+  }
 
   std::shared_ptr<gluten::SubstraitToVeloxPlanConverter> planConverter_ =
-      std::make_shared<gluten::SubstraitToVeloxPlanConverter>(pool_.get());
+      std::make_shared<gluten::SubstraitToVeloxPlanConverter>(pool());
 };
 
 TEST_F(FunctionTest, makeNames) {
diff --git a/cpp/velox/tests/MemoryManagerTest.cc 
b/cpp/velox/tests/MemoryManagerTest.cc
new file mode 100644
index 000000000..f256db1b2
--- /dev/null
+++ b/cpp/velox/tests/MemoryManagerTest.cc
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "benchmarks/common/BenchmarkUtils.h"
+#include "compute/VeloxBackend.h"
+#include "config/GlutenConfig.h"
+#include "memory/VeloxMemoryManager.h"
+#include "velox/common/base/tests/GTestUtils.h"
+
+namespace gluten {
+using namespace facebook::velox;
+
+class MockAllocationListener : public gluten::AllocationListener {
+ public:
+  void allocationChanged(int64_t diff) override {
+    currentBytes_ += diff;
+    peakBytes_ = std::max(peakBytes_, currentBytes_);
+  }
+  int64_t currentBytes() override {
+    return currentBytes_;
+  }
+  int64_t peakBytes() override {
+    return peakBytes_;
+  }
+  uint64_t currentBytes_{0L};
+  uint64_t peakBytes_{0L};
+};
+
+namespace {
+static const uint64_t kMB = 1 << 20;
+} // namespace
+
+class MemoryManagerTest : public ::testing::Test {
+ protected:
+  static void SetUpTestCase() {
+    std::unordered_map<std::string, std::string> conf = {
+        {kMemoryReservationBlockSize, 
std::to_string(kMemoryReservationBlockSizeDefault)}};
+    initVeloxBackend(conf);
+  }
+
+  void SetUp() override {
+    vmm_ = std::make_unique<VeloxMemoryManager>("test", stdAllocator_, 
std::make_unique<MockAllocationListener>());
+    listener_ = vmm_->getListener();
+    allocator_ = vmm_->allocator();
+  }
+
+  std::unique_ptr<VeloxMemoryManager> vmm_;
+  AllocationListener* listener_;
+  MemoryAllocator* allocator_;
+
+  std::shared_ptr<MemoryAllocator> stdAllocator_ = 
std::make_shared<StdMemoryAllocator>();
+
+  struct Allocation {
+    void* buffer;
+    size_t size;
+    memory::MemoryPool* pool;
+  };
+};
+
+TEST_F(MemoryManagerTest, memoryPoolWithBlockReseravtion) {
+  auto pool = vmm_->getLeafMemoryPool();
+  std::vector<Allocation> allocations;
+  std::vector<uint64_t> sizes{
+      kMemoryReservationBlockSizeDefault - 1 * kMB, 
kMemoryReservationBlockSizeDefault - 2 * kMB};
+  for (const auto& size : sizes) {
+    auto buf = pool->allocate(size);
+    allocations.push_back({buf, size, pool.get()});
+  }
+  EXPECT_EQ(listener_->currentBytes(), 2 * kMemoryReservationBlockSizeDefault);
+  EXPECT_EQ(listener_->peakBytes(), listener_->currentBytes());
+
+  for (auto& allocation : allocations) {
+    allocation.pool->free(allocation.buffer, allocation.size);
+  }
+
+  auto currentBytes = listener_->currentBytes();
+  ASSERT_EQ(vmm_->shrink(0), currentBytes);
+  ASSERT_EQ(listener_->currentBytes(), 0);
+}
+
+TEST_F(MemoryManagerTest, memoryAllocatorWithBlockReservation) {
+  std::vector<Allocation> allocations;
+  std::vector<uint64_t> sizes{
+      kMemoryReservationBlockSizeDefault - 1 * kMB, 
kMemoryReservationBlockSizeDefault - 2 * kMB};
+  for (auto i = 0; i < sizes.size(); i++) {
+    auto size = sizes[i];
+    auto currentBytes = allocator_->getBytes();
+    Allocation allocation{.size = size};
+    allocator_->allocate(size, &allocation.buffer);
+    allocations.push_back(allocation);
+
+    EXPECT_EQ(allocator_->getBytes(), currentBytes + size);
+    EXPECT_EQ(allocator_->peakBytes(), allocator_->getBytes());
+    EXPECT_EQ(listener_->currentBytes(), (i + 1) * 
kMemoryReservationBlockSizeDefault);
+    EXPECT_EQ(listener_->peakBytes(), listener_->currentBytes());
+  }
+
+  auto currentBytes = allocator_->getBytes();
+  auto allocation = allocations.back();
+  allocations.pop_back();
+  allocator_->free(allocation.buffer, allocation.size);
+  EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
+  EXPECT_EQ(listener_->currentBytes(), kMemoryReservationBlockSizeDefault);
+
+  currentBytes = allocator_->getBytes();
+  allocation = allocations.back();
+  allocations.pop_back();
+  allocator_->free(allocation.buffer, allocation.size);
+  EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
+  EXPECT_EQ(listener_->currentBytes(), 0);
+
+  ASSERT_EQ(allocator_->getBytes(), 0);
+}
+
+} // namespace gluten
diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc 
b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
index 841514261..3926b22c9 100644
--- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc
@@ -19,7 +19,6 @@
 
 #include <filesystem>
 #include "compute/VeloxPlanConverter.h"
-#include "memory/VeloxMemoryManager.h"
 #include "substrait/SubstraitToVeloxPlan.h"
 #include "velox/common/base/tests/GTestUtils.h"
 #include "velox/dwio/common/tests/utils/DataFiles.h"
@@ -72,7 +71,7 @@ class Substrait2VeloxPlanConversionTest : public 
exec::test::HiveConnectorTestBa
   std::shared_ptr<exec::test::TempDirectoryPath> 
tmpDir_{exec::test::TempDirectoryPath::create()};
   std::shared_ptr<VeloxPlanConverter> planConverter_ = 
std::make_shared<VeloxPlanConverter>(
       std::vector<std::shared_ptr<ResultIterator>>(),
-      gluten::defaultLeafVeloxMemoryPool().get(),
+      pool(),
       std::unordered_map<std::string, std::string>());
 };
 
diff --git a/cpp/velox/tests/Substrait2VeloxPlanValidatorTest.cc 
b/cpp/velox/tests/Substrait2VeloxPlanValidatorTest.cc
index 8d605e308..d5eafa1e2 100644
--- a/cpp/velox/tests/Substrait2VeloxPlanValidatorTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxPlanValidatorTest.cc
@@ -54,9 +54,6 @@ class Substrait2VeloxPlanValidatorTest : public 
exec::test::HiveConnectorTestBas
     auto planValidator = 
std::make_shared<SubstraitToVeloxPlanValidator>(pool_.get(), execCtx.get());
     return planValidator->validate(plan);
   }
-
- private:
-  std::shared_ptr<memory::MemoryPool> 
memoryPool_{gluten::defaultLeafVeloxMemoryPool()};
 };
 
 TEST_F(Substrait2VeloxPlanValidatorTest, group) {
diff --git a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc 
b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
index 4f1745f81..ffa6f032a 100644
--- a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
+++ b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
@@ -29,14 +29,12 @@ using namespace facebook::velox;
 
 namespace gluten {
 class VeloxColumnarBatchSerializerTest : public ::testing::Test, public 
test::VectorTestBase {
- protected:
-  std::shared_ptr<arrow::MemoryPool> arrowPool_ = defaultArrowMemoryPool();
-  std::shared_ptr<memory::MemoryPool> veloxPool_ = 
defaultLeafVeloxMemoryPool();
-  // velox requires the mem manager to be instanced
  protected:
   static void SetUpTestCase() {
     memory::MemoryManager::testingSetInstance({});
   }
+
+  std::shared_ptr<arrow::MemoryPool> arrowPool_ = defaultArrowMemoryPool();
 };
 
 TEST_F(VeloxColumnarBatchSerializerTest, serialize) {
@@ -54,12 +52,12 @@ TEST_F(VeloxColumnarBatchSerializerTest, serialize) {
   };
   auto vector = makeRowVector(children);
   auto batch = std::make_shared<VeloxColumnarBatch>(vector);
-  auto serializer = 
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), veloxPool_, 
nullptr);
+  auto serializer = 
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), pool_, 
nullptr);
   auto buffer = serializer->serializeColumnarBatches({batch});
 
   ArrowSchema cSchema;
   exportToArrow(vector, cSchema, ArrowUtils::getBridgeOptions());
-  auto deserializer = 
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), veloxPool_, 
&cSchema);
+  auto deserializer = 
std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), pool_, 
&cSchema);
   auto deserialized = 
deserializer->deserialize(const_cast<uint8_t*>(buffer->data()), buffer->size());
   auto deserializedVector = 
std::dynamic_pointer_cast<VeloxColumnarBatch>(deserialized)->getRowVector();
   test::assertEqualVectors(vector, deserializedVector);
diff --git a/cpp/velox/tests/VeloxColumnarBatchTest.cc 
b/cpp/velox/tests/VeloxColumnarBatchTest.cc
index 559f9f047..ba66afb40 100644
--- a/cpp/velox/tests/VeloxColumnarBatchTest.cc
+++ b/cpp/velox/tests/VeloxColumnarBatchTest.cc
@@ -24,12 +24,9 @@ using namespace facebook::velox;
 namespace gluten {
 class VeloxColumnarBatchTest : public ::testing::Test, public 
test::VectorTestBase {
  protected:
-  // Velox requires the mem manager to be instanced.
   static void SetUpTestCase() {
     memory::MemoryManager::testingSetInstance({});
   }
-
-  std::shared_ptr<memory::MemoryPool> veloxPool_ = 
defaultLeafVeloxMemoryPool();
 };
 
 TEST_F(VeloxColumnarBatchTest, flattenTruncatedVector) {
@@ -43,7 +40,7 @@ TEST_F(VeloxColumnarBatchTest, flattenTruncatedVector) {
   // First, make a row vector with the mapKeys and mapValues as children.
   // Make the row vector size less than the children size.
   auto input = std::make_shared<RowVector>(
-      veloxPool_.get(),
+      pool(),
       ROW({INTEGER(), BIGINT(), MAP(INTEGER(), BIGINT())}),
       nullptr,
       inputSize,
@@ -54,7 +51,7 @@ TEST_F(VeloxColumnarBatchTest, flattenTruncatedVector) {
 
   // Allocate a dummy indices and wrap the original mapVector with it as a 
dictionary, to force it get decoded in
   // flattenVector.
-  auto indices = allocateIndices(childSize, veloxPool_.get());
+  auto indices = allocateIndices(childSize, pool());
   auto* rawIndices = indices->asMutable<vector_size_t>();
   for (vector_size_t i = 0; i < childSize; i++) {
     rawIndices[i] = i;
diff --git a/cpp/velox/tests/VeloxColumnarToRowTest.cc 
b/cpp/velox/tests/VeloxColumnarToRowTest.cc
index 1769fa7ab..2309e6e1c 100644
--- a/cpp/velox/tests/VeloxColumnarToRowTest.cc
+++ b/cpp/velox/tests/VeloxColumnarToRowTest.cc
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-#include "jni/JniError.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "memory/VeloxMemoryManager.h"
 #include "operators/serializer/VeloxColumnarToRowConverter.h"
@@ -35,7 +34,7 @@ class VeloxColumnarToRowTest : public ::testing::Test, public 
test::VectorTestBa
   }
 
   void testRowBufferAddr(velox::RowVectorPtr vector, uint8_t* expectArr, 
int32_t expectArrSize) {
-    auto columnarToRowConverter = 
std::make_shared<VeloxColumnarToRowConverter>(veloxPool_);
+    auto columnarToRowConverter = 
std::make_shared<VeloxColumnarToRowConverter>(pool_);
 
     auto cb = std::make_shared<VeloxColumnarBatch>(vector);
     columnarToRowConverter->convert(cb);
@@ -45,9 +44,6 @@ class VeloxColumnarToRowTest : public ::testing::Test, public 
test::VectorTestBa
       ASSERT_EQ(*(address + i), *(expectArr + i));
     }
   }
-
- private:
-  std::shared_ptr<velox::memory::MemoryPool> veloxPool_ = 
defaultLeafVeloxMemoryPool();
 };
 
 TEST_F(VeloxColumnarToRowTest, Buffer_int8_int16) {
diff --git a/cpp/velox/tests/VeloxRowToColumnarTest.cc 
b/cpp/velox/tests/VeloxRowToColumnarTest.cc
index 62f809ac2..93f780ca3 100644
--- a/cpp/velox/tests/VeloxRowToColumnarTest.cc
+++ b/cpp/velox/tests/VeloxRowToColumnarTest.cc
@@ -55,9 +55,6 @@ class VeloxRowToColumnarTest : public ::testing::Test, public 
test::VectorTestBa
     auto vp = 
std::dynamic_pointer_cast<VeloxColumnarBatch>(cb)->getRowVector();
     velox::test::assertEqualVectors(vector, vp);
   }
-
- private:
-  std::shared_ptr<arrow::MemoryPool> arrowPool_ = defaultArrowMemoryPool();
 };
 
 TEST_F(VeloxRowToColumnarTest, allTypes) {
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManager.java
 
b/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManager.java
index 230a7342e..0d1a0c5ae 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManager.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/memory/nmm/NativeMemoryManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.memory.nmm;
 
-import org.apache.gluten.GlutenConfig;
 import org.apache.gluten.backendsapi.BackendsApiManager;
 import org.apache.gluten.memory.alloc.NativeMemoryAllocators;
 import org.apache.gluten.memory.memtarget.KnownNameAndStats;
@@ -46,12 +45,8 @@ public class NativeMemoryManager implements TaskResource {
 
   public static NativeMemoryManager create(String name, ReservationListener 
listener) {
     long allocatorId = 
NativeMemoryAllocators.getDefault().get().getNativeInstanceId();
-    long reservationBlockSize = 
GlutenConfig.getConf().memoryReservationBlockSize();
     return new NativeMemoryManager(
-        name,
-        create(
-            BackendsApiManager.getBackendName(), name, allocatorId, 
reservationBlockSize, listener),
-        listener);
+        name, create(BackendsApiManager.getBackendName(), name, allocatorId, 
listener), listener);
   }
 
   public long getNativeInstanceHandle() {
@@ -81,11 +76,7 @@ public class NativeMemoryManager implements TaskResource {
   private static native long shrink(long memoryManagerId, long size);
 
   private static native long create(
-      String backendType,
-      String name,
-      long allocatorId,
-      long reservationBlockSize,
-      ReservationListener listener);
+      String backendType, String name, long allocatorId, ReservationListener 
listener);
 
   private static native void release(long memoryManagerId);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to