This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c1387bd4bf [VL] Account some C++ untracked memory allocations into 
Spark global off-heap memory (#9115)
c1387bd4bf is described below

commit c1387bd4bfe67707657bb4f871200ad6068eda5e
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Mar 26 02:38:15 2025 +0000

    [VL] Account some C++ untracked memory allocations into Spark global 
off-heap memory (#9115)
---
 .../backendsapi/velox/VeloxListenerApi.scala       |  5 +-
 .../apache/spark/memory/GlobalOffHeapMemory.scala  | 73 +++++++++++++++---
 .../execution/unsafe/UnsafeBytesBufferArray.scala  | 13 +---
 .../spark/memory/GlobalOffHeapMemorySuite.scala    | 40 +++++-----
 cpp/core/jni/JniCommon.h                           | 34 ++++++--
 cpp/core/jni/JniWrapper.cc                         | 15 +---
 cpp/core/memory/MemoryAllocator.cc                 |  5 +-
 cpp/velox/benchmarks/ParquetWriteBenchmark.cc      |  4 +-
 cpp/velox/benchmarks/common/BenchmarkUtils.cc      |  2 +-
 cpp/velox/compute/VeloxBackend.cc                  | 22 ++++--
 cpp/velox/compute/VeloxBackend.h                   | 23 ++++--
 cpp/velox/jni/VeloxJniWrapper.cc                   | 10 ++-
 cpp/velox/memory/VeloxMemoryManager.cc             | 90 +++++++++++-----------
 cpp/velox/memory/VeloxMemoryManager.h              | 44 ++++++++---
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        |  5 +-
 cpp/velox/tests/BufferOutputStreamTest.cc          |  2 +-
 cpp/velox/tests/MemoryManagerTest.cc               | 10 ++-
 cpp/velox/tests/RuntimeTest.cc                     |  2 +-
 .../gluten/init/NativeBackendInitializer.java      | 14 ++--
 19 files changed, 258 insertions(+), 155 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index f2e6dbd9fb..2aa19afe7a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -32,6 +32,7 @@ import org.apache.gluten.utils._
 import org.apache.spark.{HdfsConfGenerator, ShuffleDependency, SparkConf, 
SparkContext}
 import org.apache.spark.api.plugin.PluginContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.memory.GlobalOffHeapMemory
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.shuffle.{ColumnarShuffleDependency, LookupKey, 
ShuffleManagerRegistry}
 import org.apache.spark.shuffle.sort.ColumnarShuffleManager
@@ -204,7 +205,9 @@ class VeloxListenerApi extends ListenerApi with Logging {
     if (isDriver && !inLocalMode(conf)) {
       parsed += (COLUMNAR_VELOX_CACHE_ENABLED.key -> "false")
     }
-    
NativeBackendInitializer.forBackend(VeloxBackend.BACKEND_NAME).initialize(parsed)
+    NativeBackendInitializer
+      .forBackend(VeloxBackend.BACKEND_NAME)
+      .initialize(GlobalOffHeapMemory.newReservationListener(), parsed)
 
     // Inject backend-specific implementations to override spark classes.
     GlutenFormatFactory.register(new VeloxParquetWriterInjects)
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
 
b/backends-velox/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
index 3e25fdfc74..d407a4f52b 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
@@ -17,8 +17,11 @@
 package org.apache.spark.memory
 
 import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.memory.{MemoryUsageRecorder, 
SimpleMemoryUsageRecorder}
+import org.apache.gluten.memory.listener.ReservationListener
 
 import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
 import org.apache.spark.storage.BlockId
 
 import java.lang.reflect.Field
@@ -33,7 +36,9 @@ import java.util.UUID
  * The utility internally relies on the Spark storage memory pool. As Spark 
doesn't expect trait
  * BlockId to be extended by user, TestBlockId is chosen for the storage 
memory reservations.
  */
-object GlobalOffHeapMemory {
+object GlobalOffHeapMemory extends Logging {
+  private val recorder: MemoryUsageRecorder = new SimpleMemoryUsageRecorder()
+
   private val FIELD_MEMORY_MANAGER: Field = {
     val f =
       try {
@@ -48,28 +53,72 @@ object GlobalOffHeapMemory {
     f
   }
 
-  def acquire(numBytes: Long): Boolean = {
-    memoryManager().acquireStorageMemory(
-      BlockId(s"test_${UUID.randomUUID()}"),
-      numBytes,
-      MemoryMode.OFF_HEAP)
+  def acquire(numBytes: Long): Unit = memoryManagerOption().foreach {
+    mm =>
+      val succeeded =
+        mm.acquireStorageMemory(
+          BlockId(s"test_${UUID.randomUUID()}"),
+          numBytes,
+          MemoryMode.OFF_HEAP)
+
+      if (succeeded) {
+        recorder.inc(numBytes)
+        return
+      }
+
+      // Throw OOM.
+      val offHeapMemoryTotal =
+        mm.maxOffHeapStorageMemory + mm.offHeapExecutionMemoryUsed
+      throw new GlutenException(
+        s"Spark off-heap memory is exhausted." +
+          s" Storage: ${mm.offHeapStorageMemoryUsed} / $offHeapMemoryTotal," +
+          s" execution: ${mm.offHeapExecutionMemoryUsed} / 
$offHeapMemoryTotal")
+  }
+
+  def release(numBytes: Long): Unit = memoryManagerOption().foreach {
+    mm =>
+      mm.releaseStorageMemory(numBytes, MemoryMode.OFF_HEAP)
+      recorder.inc(-numBytes)
+  }
+
+  def currentBytes(): Long = {
+    recorder.current()
   }
 
-  def release(numBytes: Long): Unit = {
-    memoryManager().releaseStorageMemory(numBytes, MemoryMode.OFF_HEAP)
+  def newReservationListener(): ReservationListener = {
+    new ReservationListener {
+      private val recorder: MemoryUsageRecorder = new 
SimpleMemoryUsageRecorder()
+
+      override def reserve(size: Long): Long = {
+        acquire(size)
+        recorder.inc(size)
+        size
+      }
+
+      override def unreserve(size: Long): Long = {
+        release(size)
+        recorder.inc(-size)
+        size
+      }
+
+      override def getUsedBytes: Long = {
+        recorder.current()
+      }
+    }
   }
 
-  private def memoryManager(): MemoryManager = {
+  private def memoryManagerOption(): Option[MemoryManager] = {
     val env = SparkEnv.get
     if (env != null) {
-      return env.memoryManager
+      return Some(env.memoryManager)
     }
     val tc = TaskContext.get()
     if (tc != null) {
       // This may happen in test code that mocks the task context without 
booting up SparkEnv.
-      return 
FIELD_MEMORY_MANAGER.get(tc.taskMemoryManager()).asInstanceOf[MemoryManager]
+      return 
Some(FIELD_MEMORY_MANAGER.get(tc.taskMemoryManager()).asInstanceOf[MemoryManager])
     }
-    throw new GlutenException(
+    logWarning(
       "Memory manager not found because the code is unlikely be run in a Spark 
application")
+    None
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeBytesBufferArray.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeBytesBufferArray.scala
index ecfb777178..5aeb5feb52 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeBytesBufferArray.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeBytesBufferArray.scala
@@ -16,9 +16,6 @@
  */
 package org.apache.spark.sql.execution.unsafe
 
-import org.apache.gluten.exception.GlutenException
-
-import org.apache.spark.SparkEnv
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.internal.Logging
 import org.apache.spark.memory.GlobalOffHeapMemory
@@ -76,15 +73,7 @@ case class UnsafeBytesBufferArray(arraySize: Int, 
bytesBufferLengths: Array[Int]
     assert(bytesBuffer.length == bytesBufferLengths(index))
     // first to allocate underlying long array
     if (null == longArray && index == 0) {
-      if (!GlobalOffHeapMemory.acquire(allocatedBytes)) {
-        val memoryManager = SparkEnv.get.memoryManager
-        val offHeapMemoryTotal =
-          memoryManager.maxOffHeapStorageMemory + 
memoryManager.offHeapExecutionMemoryUsed
-        throw new GlutenException(
-          s"Spark off-heap memory is exhausted." +
-            s" Storage: ${memoryManager.offHeapStorageMemoryUsed} / 
$offHeapMemoryTotal," +
-            s" execution: ${memoryManager.offHeapExecutionMemoryUsed} / 
$offHeapMemoryTotal")
-      }
+      GlobalOffHeapMemory.acquire(allocatedBytes)
       longArray = new 
LongArray(MemoryAllocator.UNSAFE.allocate(allocatedBytes))
     }
 
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index 9ac890e87a..288acabe2b 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.memory;
 
 import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.memory.memtarget.{Spillers, TreeMemoryTarget}
 import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers
 
@@ -24,7 +25,6 @@ import org.apache.spark.TaskContext
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.task.TaskResources
 
-import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.funsuite.AnyFunSuite
 
@@ -51,14 +51,14 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
             TreeMemoryTarget.CAPACITY_UNLIMITED,
             Spillers.NOOP,
             Collections.emptyMap())
-      Assert.assertEquals(300, consumer.borrow(300))
-      Assert.assertTrue(GlobalOffHeapMemory.acquire(50))
-      Assert.assertTrue(GlobalOffHeapMemory.acquire(40))
-      Assert.assertFalse(GlobalOffHeapMemory.acquire(30))
-      Assert.assertFalse(GlobalOffHeapMemory.acquire(11))
-      Assert.assertTrue(GlobalOffHeapMemory.acquire(10))
-      Assert.assertTrue(GlobalOffHeapMemory.acquire(0))
-      Assert.assertFalse(GlobalOffHeapMemory.acquire(1))
+      assert(consumer.borrow(300) == 300)
+      GlobalOffHeapMemory.acquire(50)
+      GlobalOffHeapMemory.acquire(40)
+      assertThrows[GlutenException](GlobalOffHeapMemory.acquire(30))
+      assertThrows[GlutenException](GlobalOffHeapMemory.acquire(11))
+      GlobalOffHeapMemory.acquire(10)
+      GlobalOffHeapMemory.acquire(0)
+      assertThrows[GlutenException](GlobalOffHeapMemory.acquire(1))
     }
   }
 
@@ -74,10 +74,10 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
             TreeMemoryTarget.CAPACITY_UNLIMITED,
             Spillers.NOOP,
             Collections.emptyMap())
-      Assert.assertTrue(GlobalOffHeapMemory.acquire(200))
-      Assert.assertEquals(100, consumer.borrow(100))
-      Assert.assertEquals(100, consumer.borrow(200))
-      Assert.assertEquals(0, consumer.borrow(50))
+      GlobalOffHeapMemory.acquire(200)
+      assert(consumer.borrow(100) == 100)
+      assert(consumer.borrow(200) == 100)
+      assert(consumer.borrow(50) == 0)
     }
   }
 
@@ -93,10 +93,10 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
             TreeMemoryTarget.CAPACITY_UNLIMITED,
             Spillers.NOOP,
             Collections.emptyMap())
-      Assert.assertTrue(GlobalOffHeapMemory.acquire(300))
-      Assert.assertEquals(100, consumer.borrow(200))
+      GlobalOffHeapMemory.acquire(300)
+      assert(consumer.borrow(200) == 100)
       GlobalOffHeapMemory.release(10)
-      Assert.assertEquals(10, consumer.borrow(50))
+      assert(consumer.borrow(50) == 10)
     }
   }
 
@@ -112,10 +112,10 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
             TreeMemoryTarget.CAPACITY_UNLIMITED,
             Spillers.NOOP,
             Collections.emptyMap())
-      Assert.assertEquals(300, consumer.borrow(300))
-      Assert.assertFalse(GlobalOffHeapMemory.acquire(200))
-      Assert.assertEquals(100, consumer.repay(100))
-      Assert.assertTrue(GlobalOffHeapMemory.acquire(200))
+      assert(consumer.borrow(300) == 300)
+      assertThrows[GlutenException](GlobalOffHeapMemory.acquire(200))
+      assert(consumer.repay(100) == 100)
+      GlobalOffHeapMemory.acquire(200)
     }
   }
 }
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index 0c3096d9d6..dfd0f8c094 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -369,8 +369,7 @@ NOTE: the class must be thread safe
 
 class SparkAllocationListener final : public gluten::AllocationListener {
  public:
-  SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID 
jReserveMethod, jmethodID jUnreserveMethod)
-      : vm_(vm), jReserveMethod_(jReserveMethod), 
jUnreserveMethod_(jUnreserveMethod) {
+  SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef) : vm_(vm) {
     JNIEnv* env;
     attachCurrentThreadAsDaemonOrThrow(vm_, &env);
     jListenerGlobalRef_ = env->NewGlobalRef(jListenerLocalRef);
@@ -398,20 +397,21 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
     JNIEnv* env;
     attachCurrentThreadAsDaemonOrThrow(vm_, &env);
     if (size < 0) {
-      env->CallLongMethod(jListenerGlobalRef_, jUnreserveMethod_, -size);
+      env->CallLongMethod(jListenerGlobalRef_, unreserveMemoryMethod(env), 
-size);
       checkException(env);
     } else {
-      env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
+      env->CallLongMethod(jListenerGlobalRef_, reserveMemoryMethod(env), size);
       checkException(env);
     }
     usedBytes_ += size;
     while (true) {
       int64_t savedPeakBytes = peakBytes_;
-      if (usedBytes_ <= savedPeakBytes) {
+      int64_t savedUsedBytes = usedBytes_;
+      if (savedUsedBytes <= savedPeakBytes) {
         break;
       }
       // usedBytes_ > savedPeakBytes, update peak
-      if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
+      if (peakBytes_.compare_exchange_weak(savedPeakBytes, savedUsedBytes)) {
         break;
       }
     }
@@ -426,10 +426,28 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
   }
 
  private:
+  jclass javaReservationListenerClass(JNIEnv* env) {
+    static jclass javaReservationListenerClass = createGlobalClassReference(
+        env,
+        "Lorg/apache/gluten/memory/listener/"
+        "ReservationListener;");
+    return javaReservationListenerClass;
+  }
+
+  jmethodID reserveMemoryMethod(JNIEnv* env) {
+    static jmethodID reserveMemoryMethod =
+        getMethodIdOrError(env, javaReservationListenerClass(env), "reserve", 
"(J)J");
+    return reserveMemoryMethod;
+  }
+
+  jmethodID unreserveMemoryMethod(JNIEnv* env) {
+    static jmethodID unreserveMemoryMethod =
+        getMethodIdOrError(env, javaReservationListenerClass(env), 
"unreserve", "(J)J");
+    return unreserveMemoryMethod;
+  }
+
   JavaVM* vm_;
   jobject jListenerGlobalRef_;
-  const jmethodID jReserveMethod_;
-  const jmethodID jUnreserveMethod_;
   std::atomic_int64_t usedBytes_{0L};
   std::atomic_int64_t peakBytes_{0L};
 };
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 4e1d1f09fd..512adf9e62 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -42,10 +42,6 @@
 using namespace gluten;
 
 namespace {
-jclass javaReservationListenerClass;
-
-jmethodID reserveMemoryMethod;
-jmethodID unreserveMemoryMethod;
 
 jclass byteArrayClass;
 
@@ -238,14 +234,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
   nativeColumnarToRowInfoConstructor = getMethodIdOrError(env, 
nativeColumnarToRowInfoClass, "<init>", "([I[IJ)V");
 
-  javaReservationListenerClass = createGlobalClassReference(
-      env,
-      "Lorg/apache/gluten/memory/listener/"
-      "ReservationListener;");
-
-  reserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass, 
"reserve", "(J)J");
-  unreserveMemoryMethod = getMethodIdOrError(env, 
javaReservationListenerClass, "unreserve", "(J)J");
-
   shuffleReaderMetricsClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/ShuffleReaderMetrics;");
   shuffleReaderMetricsSetDecompressTime =
@@ -316,8 +304,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap
   auto backendType = jStringToCString(env, jBackendType);
   auto safeArray = getByteArrayElementsSafe(env, sessionConf);
   auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
-  std::unique_ptr<AllocationListener> listener =
-      std::make_unique<SparkAllocationListener>(vm, jListener, 
reserveMemoryMethod, unreserveMemoryMethod);
+  std::unique_ptr<AllocationListener> listener = 
std::make_unique<SparkAllocationListener>(vm, jListener);
   bool backtrace = sparkConf.at(kBacktraceAllocation) == "true";
   if (backtrace) {
     listener = 
std::make_unique<BacktraceAllocationListener>(std::move(listener));
diff --git a/cpp/core/memory/MemoryAllocator.cc 
b/cpp/core/memory/MemoryAllocator.cc
index 0fd6489d02..1bd16216dc 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -110,11 +110,12 @@ void ListenableMemoryAllocator::updateUsage(int64_t size) 
{
   usedBytes_ += size;
   while (true) {
     int64_t savedPeakBytes = peakBytes_;
-    if (usedBytes_ <= savedPeakBytes) {
+    int64_t savedUsedBytes = usedBytes_;
+    if (savedUsedBytes <= savedPeakBytes) {
       break;
     }
     // usedBytes_ > savedPeakBytes, update peak
-    if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
+    if (peakBytes_.compare_exchange_weak(savedPeakBytes, savedUsedBytes)) {
       break;
     }
   }
diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc 
b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
index 4804258caf..81be604eaf 100644
--- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
+++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
@@ -34,10 +34,10 @@
 #include <chrono>
 
 #include "benchmarks/common/BenchmarkUtils.h"
+#include "compute/VeloxBackend.h"
 #include "compute/VeloxRuntime.h"
 #include "memory/ArrowMemoryPool.h"
 #include "memory/ColumnarBatch.h"
-#include "memory/VeloxMemoryManager.h"
 #include "utils/Macros.h"
 #include "utils/TestUtils.h"
 #include "utils/VeloxArrowUtils.h"
@@ -251,7 +251,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : 
public GoogleBenchmar
     auto fileName = "velox_parquet_write.parquet";
 
     auto memoryManager = getDefaultMemoryManager();
-    auto runtime = Runtime::create(kVeloxBackendKind, memoryManager.get());
+    auto runtime = Runtime::create(kVeloxBackendKind, memoryManager);
     auto veloxPool = memoryManager->getAggregateMemoryPool();
 
     for (auto _ : state) {
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc 
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index 779c9e745a..6790e9235f 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -40,7 +40,7 @@ std::unordered_map<std::string, std::string> defaultConf() {
 }
 
 void initVeloxBackend(std::unordered_map<std::string, std::string>& conf) {
-  gluten::VeloxBackend::create(conf);
+  gluten::VeloxBackend::create(AllocationListener::noop(), conf);
 }
 
 void initVeloxBackend() {
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index 1991d6902c..44313fbb39 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -65,7 +65,7 @@ namespace gluten {
 
 namespace {
 MemoryManager* veloxMemoryManagerFactory(const std::string& kind, 
std::unique_ptr<AllocationListener> listener) {
-  return new VeloxMemoryManager(kind, std::move(listener));
+  return new VeloxMemoryManager(kind, std::move(listener), 
*VeloxBackend::get()->getBackendConf());
 }
 
 void veloxMemoryManagerReleaser(MemoryManager* memoryManager) {
@@ -86,10 +86,14 @@ void veloxRuntimeReleaser(Runtime* runtime) {
 }
 } // namespace
 
-void VeloxBackend::init(const std::unordered_map<std::string, std::string>& 
conf) {
+void VeloxBackend::init(
+    std::unique_ptr<AllocationListener> listener,
+    const std::unordered_map<std::string, std::string>& conf) {
   backendConf_ =
       
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(conf));
 
+  globalMemoryManager_ = 
std::make_unique<VeloxMemoryManager>(kVeloxBackendKind, std::move(listener), 
*backendConf_);
+
   // Register factories.
   MemoryManager::registerFactory(kVeloxBackendKind, veloxMemoryManagerFactory, 
veloxMemoryManagerReleaser);
   Runtime::registerFactory(kVeloxBackendKind, veloxRuntimeFactory, 
veloxRuntimeReleaser);
@@ -175,7 +179,10 @@ void VeloxBackend::init(const 
std::unordered_map<std::string, std::string>& conf
 
   initUdf();
 
-  // Initialize the global memory manager for current process.
+  // Initialize Velox-side memory manager for current process. The memory 
manager
+  // will be used during spill calls so we don't track it with Spark off-heap 
memory instead
+  // we rely on overhead memory. If we track it with off-heap memory, 
recursive reservations from
+  // Spark off-heap memory pool will be conducted to cause unexpected OOMs.
   auto sparkOverhead = backendConf_->get<int64_t>(kSparkOverheadMemory);
   int64_t memoryManagerCapacity;
   if (sparkOverhead.hasValue()) {
@@ -186,7 +193,7 @@ void VeloxBackend::init(const 
std::unordered_map<std::string, std::string>& conf
     memoryManagerCapacity = facebook::velox::memory::kMaxMemory;
   }
   LOG(INFO) << "Setting global Velox memory manager with capacity: " << 
memoryManagerCapacity;
-  facebook::velox::memory::MemoryManager::initialize({.allocatorCapacity = 
memoryManagerCapacity});
+  facebook::velox::memory::initializeMemoryManager({.allocatorCapacity = 
memoryManagerCapacity});
 }
 
 facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() 
const {
@@ -306,8 +313,10 @@ void VeloxBackend::initUdf() {
 
 std::unique_ptr<VeloxBackend> VeloxBackend::instance_ = nullptr;
 
-void VeloxBackend::create(const std::unordered_map<std::string, std::string>& 
conf) {
-  instance_ = std::unique_ptr<VeloxBackend>(new VeloxBackend(conf));
+void VeloxBackend::create(
+    std::unique_ptr<AllocationListener> listener,
+    const std::unordered_map<std::string, std::string>& conf) {
+  instance_ = std::unique_ptr<VeloxBackend>(new 
VeloxBackend(std::move(listener), conf));
 }
 
 VeloxBackend* VeloxBackend::get() {
@@ -317,5 +326,4 @@ VeloxBackend* VeloxBackend::get() {
   }
   return instance_.get();
 }
-
 } // namespace gluten
diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h
index e00a8868d1..86c77d4508 100644
--- a/cpp/velox/compute/VeloxBackend.h
+++ b/cpp/velox/compute/VeloxBackend.h
@@ -26,10 +26,12 @@
 
 #include "velox/common/caching/AsyncDataCache.h"
 #include "velox/common/config/Config.h"
-#include "velox/common/memory/MemoryPool.h"
 #include "velox/common/memory/MmapAllocator.h"
 
+#include "memory/VeloxMemoryManager.h"
+
 namespace gluten {
+
 // This kind string must be same with VeloxBackend#name in java side.
 inline static const std::string kVeloxBackendKind{"velox"};
 /// As a static instance in per executor, initialized at executor startup.
@@ -49,7 +51,9 @@ class VeloxBackend {
     }
   }
 
-  static void create(const std::unordered_map<std::string, std::string>& conf);
+  static void create(
+      std::unique_ptr<AllocationListener> listener,
+      const std::unordered_map<std::string, std::string>& conf);
 
   static VeloxBackend* get();
 
@@ -59,19 +63,26 @@ class VeloxBackend {
     return backendConf_;
   }
 
+  VeloxMemoryManager* getGlobalMemoryManager() const {
+    return globalMemoryManager_.get();
+  }
+
   void tearDown() {
     // Destruct IOThreadPoolExecutor will join all threads.
     // On threads exit, thread local variables can be constructed with 
referencing global variables.
     // So, we need to destruct IOThreadPoolExecutor and stop the threads 
before global variables get destructed.
     ioExecutor_.reset();
+    globalMemoryManager_.reset();
   }
 
  private:
-  explicit VeloxBackend(const std::unordered_map<std::string, std::string>& 
conf) {
-    init(conf);
+  explicit VeloxBackend(
+      std::unique_ptr<AllocationListener> listener,
+      const std::unordered_map<std::string, std::string>& conf) {
+    init(std::move(listener), conf);
   }
 
-  void init(const std::unordered_map<std::string, std::string>& conf);
+  void init(std::unique_ptr<AllocationListener> listener, const 
std::unordered_map<std::string, std::string>& conf);
   void initCache();
   void initConnector();
   void initUdf();
@@ -84,6 +95,8 @@ class VeloxBackend {
 
   static std::unique_ptr<VeloxBackend> instance_;
 
+  // A global Velox memory manager for the current process.
+  std::unique_ptr<VeloxMemoryManager> globalMemoryManager_;
   // Instance of AsyncDataCache used for all large allocations.
   std::shared_ptr<facebook::velox::cache::AsyncDataCache> asyncDataCache_;
 
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index d29f5b0f7b..879ade9dc7 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -86,11 +86,19 @@ void JNI_OnUnload(JavaVM* vm, void*) {
 JNIEXPORT void JNICALL 
Java_org_apache_gluten_init_NativeBackendInitializer_initialize( // NOLINT
     JNIEnv* env,
     jclass,
+    jobject jListener,
     jbyteArray conf) {
   JNI_METHOD_START
+  JavaVM* vm;
+  if (env->GetJavaVM(&vm) != JNI_OK) {
+    throw GlutenException("Unable to get JavaVM instance");
+  }
   auto safeArray = getByteArrayElementsSafe(env, conf);
+  // Create a global allocation listener that reserves global off-heap memory 
from Java-side GlobalOffHeapMemory utility
+  // class.
+  std::unique_ptr<AllocationListener> listener = 
std::make_unique<SparkAllocationListener>(vm, jListener);
   auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
-  VeloxBackend::create(sparkConf);
+  VeloxBackend::create(std::move(listener), sparkConf);
   JNI_METHOD_END()
 }
 
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 69919a38d4..3aeb50e451 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -20,11 +20,12 @@
 #include <jemalloc/jemalloc.h>
 #endif
 
+#include "compute/VeloxBackend.h"
+
 #include "velox/common/memory/MallocAllocator.h"
 #include "velox/common/memory/MemoryPool.h"
 #include "velox/exec/MemoryReclaimer.h"
 
-#include "compute/VeloxBackend.h"
 #include "config/VeloxConfig.h"
 #include "memory/ArrowMemoryPool.h"
 #include "utils/Exception.h"
@@ -35,15 +36,22 @@ namespace gluten {
 
 using namespace facebook;
 
-namespace {
+std::unordered_map<std::string, std::string> getExtraArbitratorConfigs(
+    const facebook::velox::config::ConfigBase& backendConf) {
+  auto reservationBlockSize =
+      backendConf.get<uint64_t>(kMemoryReservationBlockSize, 
kMemoryReservationBlockSizeDefault);
+  auto memInitCapacity = backendConf.get<uint64_t>(kVeloxMemInitCapacity, 
kVeloxMemInitCapacityDefault);
+  auto memReclaimMaxWaitMs = 
backendConf.get<uint64_t>(kVeloxMemReclaimMaxWaitMs, 
kVeloxMemReclaimMaxWaitMsDefault);
 
-static constexpr std::string_view 
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
-static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
-static constexpr std::string_view 
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
-static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
-static constexpr std::string_view 
kMemoryReclaimMaxWaitMs{"memory-reclaim-max-wait-time"};
-static constexpr std::string_view kDefaultMemoryReclaimMaxWaitMs{"3600000ms"};
+  std::unordered_map<std::string, std::string> extraArbitratorConfigs;
+  extraArbitratorConfigs[std::string(kMemoryPoolInitialCapacity)] = 
folly::to<std::string>(memInitCapacity) + "B";
+  extraArbitratorConfigs[std::string(kMemoryPoolTransferCapacity)] = 
folly::to<std::string>(reservationBlockSize) + "B";
+  extraArbitratorConfigs[std::string(kMemoryReclaimMaxWaitMs)] = 
folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
 
+  return extraArbitratorConfigs;
+}
+
+namespace {
 template <typename T>
 T getConfig(
     const std::unordered_map<std::string, std::string>& configs,
@@ -58,7 +66,6 @@ T getConfig(
   }
   return defaultValue;
 }
-} // namespace
 
 /// We assume in a single Spark task. No thread-safety should be guaranteed.
 class ListenableArbitrator : public velox::memory::MemoryArbitrator {
@@ -187,49 +194,34 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
   std::unordered_map<velox::memory::MemoryPool*, 
std::weak_ptr<velox::memory::MemoryPool>> candidates_;
 };
 
-class ArbitratorFactoryRegister {
- public:
-  explicit ArbitratorFactoryRegister(gluten::AllocationListener* listener) : 
listener_(listener) {
-    static std::atomic_uint32_t id{0UL};
-    kind_ = "GLUTEN_ARBITRATOR_FACTORY_" + std::to_string(id++);
-    velox::memory::MemoryArbitrator::registerFactory(
-        kind_,
-        [this](
-            const velox::memory::MemoryArbitrator::Config& config) -> 
std::unique_ptr<velox::memory::MemoryArbitrator> {
-          return std::make_unique<ListenableArbitrator>(config, listener_);
-        });
-  }
-
-  virtual ~ArbitratorFactoryRegister() {
-    velox::memory::MemoryArbitrator::unregisterFactory(kind_);
-  }
+} // namespace
 
-  const std::string& getKind() const {
-    return kind_;
-  }
+ArbitratorFactoryRegister::ArbitratorFactoryRegister(gluten::AllocationListener*
 listener) : listener_(listener) {
+  static std::atomic_uint32_t id{0UL};
+  kind_ = "GLUTEN_ARBITRATOR_FACTORY_" + std::to_string(id++);
+  velox::memory::MemoryArbitrator::registerFactory(
+      kind_,
+      [this](
+          const velox::memory::MemoryArbitrator::Config& config) -> 
std::unique_ptr<velox::memory::MemoryArbitrator> {
+        return std::make_unique<ListenableArbitrator>(config, listener_);
+      });
+}
 
- private:
-  std::string kind_;
-  gluten::AllocationListener* listener_;
-};
+ArbitratorFactoryRegister::~ArbitratorFactoryRegister() {
+  velox::memory::MemoryArbitrator::unregisterFactory(kind_);
+}
 
-VeloxMemoryManager::VeloxMemoryManager(const std::string& kind, 
std::unique_ptr<AllocationListener> listener)
+VeloxMemoryManager::VeloxMemoryManager(
+    const std::string& kind,
+    std::unique_ptr<AllocationListener> listener,
+    const facebook::velox::config::ConfigBase& backendConf)
     : MemoryManager(kind), listener_(std::move(listener)) {
-  auto reservationBlockSize = 
VeloxBackend::get()->getBackendConf()->get<uint64_t>(
-      kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
-  auto memInitCapacity =
-      
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity, 
kVeloxMemInitCapacityDefault);
-  auto memReclaimMaxWaitMs =
-      
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs, 
kVeloxMemReclaimMaxWaitMsDefault);
+  auto reservationBlockSize =
+      backendConf.get<uint64_t>(kMemoryReservationBlockSize, 
kMemoryReservationBlockSizeDefault);
   blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), 
reservationBlockSize);
   listenableAlloc_ = 
std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), 
blockListener_.get());
   arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
 
-  std::unordered_map<std::string, std::string> extraArbitratorConfigs;
-  extraArbitratorConfigs[std::string(kMemoryPoolInitialCapacity)] = 
folly::to<std::string>(memInitCapacity) + "B";
-  extraArbitratorConfigs[std::string(kMemoryPoolTransferCapacity)] = 
folly::to<std::string>(reservationBlockSize) + "B";
-  extraArbitratorConfigs[std::string(kMemoryReclaimMaxWaitMs)] = 
folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
-
   ArbitratorFactoryRegister afr(listener_.get());
   velox::memory::MemoryManagerOptions mmOptions{
       .alignment = velox::memory::MemoryAllocator::kMaxAlignment,
@@ -239,7 +231,7 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string& 
kind, std::unique_ptr<
       .coreOnAllocationFailureEnabled = false,
       .allocatorCapacity = velox::memory::kMaxMemory,
       .arbitratorKind = afr.getKind(),
-      .extraArbitratorConfigs = extraArbitratorConfigs};
+      .extraArbitratorConfigs = getExtraArbitratorConfigs(backendConf)};
   veloxMemoryManager_ = 
std::make_unique<velox::memory::MemoryManager>(mmOptions);
 
   veloxAggregatePool_ = veloxMemoryManager_->addRootPool(
@@ -405,4 +397,12 @@ VeloxMemoryManager::~VeloxMemoryManager() {
 #endif
 }
 
+VeloxMemoryManager* getDefaultMemoryManager() {
+  return VeloxBackend::get()->getGlobalMemoryManager();
+}
+
+std::shared_ptr<velox::memory::MemoryPool> defaultLeafVeloxMemoryPool() {
+  return getDefaultMemoryManager()->getLeafMemoryPool();
+}
+
 } // namespace gluten
diff --git a/cpp/velox/memory/VeloxMemoryManager.h 
b/cpp/velox/memory/VeloxMemoryManager.h
index 638a4f3b3e..c2d79a9131 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -17,19 +17,48 @@
 
 #pragma once
 
-#include "compute/VeloxBackend.h"
 #include "memory/AllocationListener.h"
 #include "memory/MemoryAllocator.h"
 #include "memory/MemoryManager.h"
 #include "velox/common/memory/Memory.h"
 #include "velox/common/memory/MemoryPool.h"
 
+#include <velox/common/config/Config.h>
+
 namespace gluten {
 
+constexpr std::string_view 
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
+constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
+constexpr std::string_view 
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
+constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
+constexpr std::string_view 
kMemoryReclaimMaxWaitMs{"memory-reclaim-max-wait-time"};
+constexpr std::string_view kDefaultMemoryReclaimMaxWaitMs{"3600000ms"};
+
+std::unordered_map<std::string, std::string> getExtraArbitratorConfigs(
+    const facebook::velox::config::ConfigBase& backendConf);
+
+class ArbitratorFactoryRegister {
+ public:
+  explicit ArbitratorFactoryRegister(gluten::AllocationListener* listener);
+
+  virtual ~ArbitratorFactoryRegister();
+
+  const std::string& getKind() const {
+    return kind_;
+  }
+
+ private:
+  std::string kind_;
+  gluten::AllocationListener* listener_;
+};
+
 // Make sure the class is thread safe
 class VeloxMemoryManager final : public MemoryManager {
  public:
-  VeloxMemoryManager(const std::string& kind, 
std::unique_ptr<AllocationListener> listener);
+  VeloxMemoryManager(
+      const std::string& kind,
+      std::unique_ptr<AllocationListener> listener,
+      const facebook::velox::config::ConfigBase& backendConf);
 
   ~VeloxMemoryManager() override;
   VeloxMemoryManager(const VeloxMemoryManager&) = delete;
@@ -87,15 +116,8 @@ class VeloxMemoryManager final : public MemoryManager {
   std::vector<std::shared_ptr<facebook::velox::memory::MemoryPool>> 
heldVeloxPools_;
 };
 
-/// Not tracked by Spark and should only used in test or validation.
-inline std::shared_ptr<gluten::VeloxMemoryManager> getDefaultMemoryManager() {
-  static auto memoryManager =
-      std::make_shared<gluten::VeloxMemoryManager>(gluten::kVeloxBackendKind, 
gluten::AllocationListener::noop());
-  return memoryManager;
-}
+VeloxMemoryManager* getDefaultMemoryManager();
 
-inline std::shared_ptr<facebook::velox::memory::MemoryPool> 
defaultLeafVeloxMemoryPool() {
-  return getDefaultMemoryManager()->getLeafMemoryPool();
-}
+std::shared_ptr<facebook::velox::memory::MemoryPool> 
defaultLeafVeloxMemoryPool();
 
 } // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index b7fbe9a47f..909d00e5e8 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -16,6 +16,9 @@
  */
 
 #include "SubstraitToVeloxPlan.h"
+
+#include "utils/StringUtil.h"
+
 #include "TypeUtils.h"
 #include "VariantToVectorConverter.h"
 #include "operators/plannodes/RowVectorStream.h"
@@ -498,7 +501,7 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
 }
 
 std::string makeUuid() {
-  return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
+  return generateUuid();
 }
 
 std::string compressionFileNameSuffix(common::CompressionKind kind) {
diff --git a/cpp/velox/tests/BufferOutputStreamTest.cc 
b/cpp/velox/tests/BufferOutputStreamTest.cc
index b9ea62fd7b..b4eb2bceab 100644
--- a/cpp/velox/tests/BufferOutputStreamTest.cc
+++ b/cpp/velox/tests/BufferOutputStreamTest.cc
@@ -29,7 +29,7 @@ class BufferOutputStreamTest : public ::testing::Test, public 
test::VectorTestBa
  protected:
   // Velox requires the mem manager to be instanced.
   static void SetUpTestCase() {
-    VeloxBackend::create({});
+    VeloxBackend::create(AllocationListener::noop(), {});
     memory::MemoryManager::testingSetInstance({});
   }
 
diff --git a/cpp/velox/tests/MemoryManagerTest.cc 
b/cpp/velox/tests/MemoryManagerTest.cc
index 85df9eed49..8dd41aec7d 100644
--- a/cpp/velox/tests/MemoryManagerTest.cc
+++ b/cpp/velox/tests/MemoryManagerTest.cc
@@ -50,11 +50,12 @@ class MemoryManagerTest : public ::testing::Test {
     std::unordered_map<std::string, std::string> conf = {
         {kMemoryReservationBlockSize, 
std::to_string(kMemoryReservationBlockSizeDefault)},
         {kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
-    gluten::VeloxBackend::create(conf);
+    gluten::VeloxBackend::create(AllocationListener::noop(), conf);
   }
 
   void SetUp() override {
-    vmm_ = std::make_unique<VeloxMemoryManager>(gluten::kVeloxBackendKind, 
std::make_unique<MockAllocationListener>());
+    vmm_ = std::make_unique<VeloxMemoryManager>(
+        gluten::kVeloxBackendKind, std::make_unique<MockAllocationListener>(), 
*VeloxBackend::get()->getBackendConf());
     listener_ = vmm_->getListener();
     allocator_ = vmm_->allocator();
   }
@@ -333,11 +334,12 @@ class MultiMemoryManagerTest : public ::testing::Test {
     std::unordered_map<std::string, std::string> conf = {
         {kMemoryReservationBlockSize, 
std::to_string(kMemoryReservationBlockSizeDefault)},
         {kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
-    gluten::VeloxBackend::create(conf);
+    gluten::VeloxBackend::create(AllocationListener::noop(), conf);
   }
 
   std::unique_ptr<VeloxMemoryManager> 
newVeloxMemoryManager(std::unique_ptr<AllocationListener> listener) {
-    return std::make_unique<VeloxMemoryManager>(gluten::kVeloxBackendKind, 
std::move(listener));
+    return std::make_unique<VeloxMemoryManager>(
+        gluten::kVeloxBackendKind, std::move(listener), 
*VeloxBackend::get()->getBackendConf());
   }
 };
 
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index b4944d9205..212b0201ed 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -144,7 +144,7 @@ TEST(TestRuntime, CreateRuntime) {
 }
 
 TEST(TestRuntime, CreateVeloxRuntime) {
-  VeloxBackend::create({});
+  VeloxBackend::create(AllocationListener::noop(), {});
   auto mm = MemoryManager::create(kVeloxBackendKind, 
AllocationListener::noop());
   auto runtime = Runtime::create(kVeloxBackendKind, mm);
   ASSERT_EQ(typeid(*runtime), typeid(VeloxRuntime));
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
index 232ec5d7a8..c0ae2199c5 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
@@ -17,6 +17,7 @@
 package org.apache.gluten.init;
 
 import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.memory.listener.ReservationListener;
 import org.apache.gluten.utils.ConfigUtil;
 
 import org.apache.spark.util.SparkShutdownManagerUtil;
@@ -52,12 +53,11 @@ public final class NativeBackendInitializer {
   // In local mode, NativeBackendInitializer#initializeBackend will be invoked 
twice in same
   // thread, driver first then executor, initialized flag ensure only invoke 
initializeBackend once,
   // so there are no race condition here.
-  public void initialize(scala.collection.Map<String, String> conf) {
+  public void initialize(ReservationListener rl, scala.collection.Map<String, 
String> conf) {
     if (!initialized.compareAndSet(false, true)) {
-      // Already called.
-      return;
+      throw new IllegalStateException("Already initialized");
     }
-    initialize0(conf);
+    initialize0(rl, conf);
     SparkShutdownManagerUtil.addHook(
         () -> {
           shutdown();
@@ -65,17 +65,17 @@ public final class NativeBackendInitializer {
         });
   }
 
-  private void initialize0(scala.collection.Map<String, String> conf) {
+  private void initialize0(ReservationListener rl, 
scala.collection.Map<String, String> conf) {
     try {
       Map<String, String> nativeConfMap = 
GlutenConfig.getNativeBackendConf(backendName, conf);
-      initialize(ConfigUtil.serialize(nativeConfMap));
+      initialize(rl, ConfigUtil.serialize(nativeConfMap));
     } catch (Exception e) {
       LOG.error("Failed to call native backend's initialize method", e);
       throw e;
     }
   }
 
-  private native void initialize(byte[] configPlan);
+  private native void initialize(ReservationListener rl, byte[] configPlan);
 
   private native void shutdown();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to