This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c1387bd4bf [VL] Account some C++ untracked memory allocations into
Spark global off-heap memory (#9115)
c1387bd4bf is described below
commit c1387bd4bfe67707657bb4f871200ad6068eda5e
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Mar 26 02:38:15 2025 +0000
[VL] Account some C++ untracked memory allocations into Spark global
off-heap memory (#9115)
---
.../backendsapi/velox/VeloxListenerApi.scala | 5 +-
.../apache/spark/memory/GlobalOffHeapMemory.scala | 73 +++++++++++++++---
.../execution/unsafe/UnsafeBytesBufferArray.scala | 13 +---
.../spark/memory/GlobalOffHeapMemorySuite.scala | 40 +++++-----
cpp/core/jni/JniCommon.h | 34 ++++++--
cpp/core/jni/JniWrapper.cc | 15 +---
cpp/core/memory/MemoryAllocator.cc | 5 +-
cpp/velox/benchmarks/ParquetWriteBenchmark.cc | 4 +-
cpp/velox/benchmarks/common/BenchmarkUtils.cc | 2 +-
cpp/velox/compute/VeloxBackend.cc | 22 ++++--
cpp/velox/compute/VeloxBackend.h | 23 ++++--
cpp/velox/jni/VeloxJniWrapper.cc | 10 ++-
cpp/velox/memory/VeloxMemoryManager.cc | 90 +++++++++++-----------
cpp/velox/memory/VeloxMemoryManager.h | 44 ++++++++---
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 5 +-
cpp/velox/tests/BufferOutputStreamTest.cc | 2 +-
cpp/velox/tests/MemoryManagerTest.cc | 10 ++-
cpp/velox/tests/RuntimeTest.cc | 2 +-
.../gluten/init/NativeBackendInitializer.java | 14 ++--
19 files changed, 258 insertions(+), 155 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index f2e6dbd9fb..2aa19afe7a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -32,6 +32,7 @@ import org.apache.gluten.utils._
import org.apache.spark.{HdfsConfGenerator, ShuffleDependency, SparkConf,
SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
+import org.apache.spark.memory.GlobalOffHeapMemory
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.shuffle.{ColumnarShuffleDependency, LookupKey,
ShuffleManagerRegistry}
import org.apache.spark.shuffle.sort.ColumnarShuffleManager
@@ -204,7 +205,9 @@ class VeloxListenerApi extends ListenerApi with Logging {
if (isDriver && !inLocalMode(conf)) {
parsed += (COLUMNAR_VELOX_CACHE_ENABLED.key -> "false")
}
-
NativeBackendInitializer.forBackend(VeloxBackend.BACKEND_NAME).initialize(parsed)
+ NativeBackendInitializer
+ .forBackend(VeloxBackend.BACKEND_NAME)
+ .initialize(GlobalOffHeapMemory.newReservationListener(), parsed)
// Inject backend-specific implementations to override spark classes.
GlutenFormatFactory.register(new VeloxParquetWriterInjects)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
b/backends-velox/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
index 3e25fdfc74..d407a4f52b 100644
---
a/backends-velox/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/memory/GlobalOffHeapMemory.scala
@@ -17,8 +17,11 @@
package org.apache.spark.memory
import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.memory.{MemoryUsageRecorder,
SimpleMemoryUsageRecorder}
+import org.apache.gluten.memory.listener.ReservationListener
import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
import org.apache.spark.storage.BlockId
import java.lang.reflect.Field
@@ -33,7 +36,9 @@ import java.util.UUID
* The utility internally relies on the Spark storage memory pool. As Spark
doesn't expect trait
* BlockId to be extended by user, TestBlockId is chosen for the storage
memory reservations.
*/
-object GlobalOffHeapMemory {
+object GlobalOffHeapMemory extends Logging {
+ private val recorder: MemoryUsageRecorder = new SimpleMemoryUsageRecorder()
+
private val FIELD_MEMORY_MANAGER: Field = {
val f =
try {
@@ -48,28 +53,72 @@ object GlobalOffHeapMemory {
f
}
- def acquire(numBytes: Long): Boolean = {
- memoryManager().acquireStorageMemory(
- BlockId(s"test_${UUID.randomUUID()}"),
- numBytes,
- MemoryMode.OFF_HEAP)
+ def acquire(numBytes: Long): Unit = memoryManagerOption().foreach {
+ mm =>
+ val succeeded =
+ mm.acquireStorageMemory(
+ BlockId(s"test_${UUID.randomUUID()}"),
+ numBytes,
+ MemoryMode.OFF_HEAP)
+
+ if (succeeded) {
+ recorder.inc(numBytes)
+ return
+ }
+
+ // Throw OOM.
+ val offHeapMemoryTotal =
+ mm.maxOffHeapStorageMemory + mm.offHeapExecutionMemoryUsed
+ throw new GlutenException(
+ s"Spark off-heap memory is exhausted." +
+ s" Storage: ${mm.offHeapStorageMemoryUsed} / $offHeapMemoryTotal," +
+ s" execution: ${mm.offHeapExecutionMemoryUsed} /
$offHeapMemoryTotal")
+ }
+
+ def release(numBytes: Long): Unit = memoryManagerOption().foreach {
+ mm =>
+ mm.releaseStorageMemory(numBytes, MemoryMode.OFF_HEAP)
+ recorder.inc(-numBytes)
+ }
+
+ def currentBytes(): Long = {
+ recorder.current()
}
- def release(numBytes: Long): Unit = {
- memoryManager().releaseStorageMemory(numBytes, MemoryMode.OFF_HEAP)
+ def newReservationListener(): ReservationListener = {
+ new ReservationListener {
+ private val recorder: MemoryUsageRecorder = new
SimpleMemoryUsageRecorder()
+
+ override def reserve(size: Long): Long = {
+ acquire(size)
+ recorder.inc(size)
+ size
+ }
+
+ override def unreserve(size: Long): Long = {
+ release(size)
+ recorder.inc(-size)
+ size
+ }
+
+ override def getUsedBytes: Long = {
+ recorder.current()
+ }
+ }
}
- private def memoryManager(): MemoryManager = {
+ private def memoryManagerOption(): Option[MemoryManager] = {
val env = SparkEnv.get
if (env != null) {
- return env.memoryManager
+ return Some(env.memoryManager)
}
val tc = TaskContext.get()
if (tc != null) {
// This may happen in test code that mocks the task context without
booting up SparkEnv.
- return
FIELD_MEMORY_MANAGER.get(tc.taskMemoryManager()).asInstanceOf[MemoryManager]
+ return
Some(FIELD_MEMORY_MANAGER.get(tc.taskMemoryManager()).asInstanceOf[MemoryManager])
}
- throw new GlutenException(
+ logWarning(
"Memory manager not found because the code is unlikely be run in a Spark
application")
+ None
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeBytesBufferArray.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeBytesBufferArray.scala
index ecfb777178..5aeb5feb52 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeBytesBufferArray.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeBytesBufferArray.scala
@@ -16,9 +16,6 @@
*/
package org.apache.spark.sql.execution.unsafe
-import org.apache.gluten.exception.GlutenException
-
-import org.apache.spark.SparkEnv
import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.memory.GlobalOffHeapMemory
@@ -76,15 +73,7 @@ case class UnsafeBytesBufferArray(arraySize: Int,
bytesBufferLengths: Array[Int]
assert(bytesBuffer.length == bytesBufferLengths(index))
// first to allocate underlying long array
if (null == longArray && index == 0) {
- if (!GlobalOffHeapMemory.acquire(allocatedBytes)) {
- val memoryManager = SparkEnv.get.memoryManager
- val offHeapMemoryTotal =
- memoryManager.maxOffHeapStorageMemory +
memoryManager.offHeapExecutionMemoryUsed
- throw new GlutenException(
- s"Spark off-heap memory is exhausted." +
- s" Storage: ${memoryManager.offHeapStorageMemoryUsed} /
$offHeapMemoryTotal," +
- s" execution: ${memoryManager.offHeapExecutionMemoryUsed} /
$offHeapMemoryTotal")
- }
+ GlobalOffHeapMemory.acquire(allocatedBytes)
longArray = new
LongArray(MemoryAllocator.UNSAFE.allocate(allocatedBytes))
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index 9ac890e87a..288acabe2b 100644
---
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.memory;
import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.exception.GlutenException
import org.apache.gluten.memory.memtarget.{Spillers, TreeMemoryTarget}
import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers
@@ -24,7 +25,6 @@ import org.apache.spark.TaskContext
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.task.TaskResources
-import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
@@ -51,14 +51,14 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
TreeMemoryTarget.CAPACITY_UNLIMITED,
Spillers.NOOP,
Collections.emptyMap())
- Assert.assertEquals(300, consumer.borrow(300))
- Assert.assertTrue(GlobalOffHeapMemory.acquire(50))
- Assert.assertTrue(GlobalOffHeapMemory.acquire(40))
- Assert.assertFalse(GlobalOffHeapMemory.acquire(30))
- Assert.assertFalse(GlobalOffHeapMemory.acquire(11))
- Assert.assertTrue(GlobalOffHeapMemory.acquire(10))
- Assert.assertTrue(GlobalOffHeapMemory.acquire(0))
- Assert.assertFalse(GlobalOffHeapMemory.acquire(1))
+ assert(consumer.borrow(300) == 300)
+ GlobalOffHeapMemory.acquire(50)
+ GlobalOffHeapMemory.acquire(40)
+ assertThrows[GlutenException](GlobalOffHeapMemory.acquire(30))
+ assertThrows[GlutenException](GlobalOffHeapMemory.acquire(11))
+ GlobalOffHeapMemory.acquire(10)
+ GlobalOffHeapMemory.acquire(0)
+ assertThrows[GlutenException](GlobalOffHeapMemory.acquire(1))
}
}
@@ -74,10 +74,10 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
TreeMemoryTarget.CAPACITY_UNLIMITED,
Spillers.NOOP,
Collections.emptyMap())
- Assert.assertTrue(GlobalOffHeapMemory.acquire(200))
- Assert.assertEquals(100, consumer.borrow(100))
- Assert.assertEquals(100, consumer.borrow(200))
- Assert.assertEquals(0, consumer.borrow(50))
+ GlobalOffHeapMemory.acquire(200)
+ assert(consumer.borrow(100) == 100)
+ assert(consumer.borrow(200) == 100)
+ assert(consumer.borrow(50) == 0)
}
}
@@ -93,10 +93,10 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
TreeMemoryTarget.CAPACITY_UNLIMITED,
Spillers.NOOP,
Collections.emptyMap())
- Assert.assertTrue(GlobalOffHeapMemory.acquire(300))
- Assert.assertEquals(100, consumer.borrow(200))
+ GlobalOffHeapMemory.acquire(300)
+ assert(consumer.borrow(200) == 100)
GlobalOffHeapMemory.release(10)
- Assert.assertEquals(10, consumer.borrow(50))
+ assert(consumer.borrow(50) == 10)
}
}
@@ -112,10 +112,10 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
TreeMemoryTarget.CAPACITY_UNLIMITED,
Spillers.NOOP,
Collections.emptyMap())
- Assert.assertEquals(300, consumer.borrow(300))
- Assert.assertFalse(GlobalOffHeapMemory.acquire(200))
- Assert.assertEquals(100, consumer.repay(100))
- Assert.assertTrue(GlobalOffHeapMemory.acquire(200))
+ assert(consumer.borrow(300) == 300)
+ assertThrows[GlutenException](GlobalOffHeapMemory.acquire(200))
+ assert(consumer.repay(100) == 100)
+ GlobalOffHeapMemory.acquire(200)
}
}
}
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index 0c3096d9d6..dfd0f8c094 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -369,8 +369,7 @@ NOTE: the class must be thread safe
class SparkAllocationListener final : public gluten::AllocationListener {
public:
- SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID
jReserveMethod, jmethodID jUnreserveMethod)
- : vm_(vm), jReserveMethod_(jReserveMethod),
jUnreserveMethod_(jUnreserveMethod) {
+ SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef) : vm_(vm) {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jListenerGlobalRef_ = env->NewGlobalRef(jListenerLocalRef);
@@ -398,20 +397,21 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
if (size < 0) {
- env->CallLongMethod(jListenerGlobalRef_, jUnreserveMethod_, -size);
+ env->CallLongMethod(jListenerGlobalRef_, unreserveMemoryMethod(env),
-size);
checkException(env);
} else {
- env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
+ env->CallLongMethod(jListenerGlobalRef_, reserveMemoryMethod(env), size);
checkException(env);
}
usedBytes_ += size;
while (true) {
int64_t savedPeakBytes = peakBytes_;
- if (usedBytes_ <= savedPeakBytes) {
+ int64_t savedUsedBytes = usedBytes_;
+ if (savedUsedBytes <= savedPeakBytes) {
break;
}
// usedBytes_ > savedPeakBytes, update peak
- if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
+ if (peakBytes_.compare_exchange_weak(savedPeakBytes, savedUsedBytes)) {
break;
}
}
@@ -426,10 +426,28 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
}
private:
+ jclass javaReservationListenerClass(JNIEnv* env) {
+ static jclass javaReservationListenerClass = createGlobalClassReference(
+ env,
+ "Lorg/apache/gluten/memory/listener/"
+ "ReservationListener;");
+ return javaReservationListenerClass;
+ }
+
+ jmethodID reserveMemoryMethod(JNIEnv* env) {
+ static jmethodID reserveMemoryMethod =
+ getMethodIdOrError(env, javaReservationListenerClass(env), "reserve",
"(J)J");
+ return reserveMemoryMethod;
+ }
+
+ jmethodID unreserveMemoryMethod(JNIEnv* env) {
+ static jmethodID unreserveMemoryMethod =
+ getMethodIdOrError(env, javaReservationListenerClass(env),
"unreserve", "(J)J");
+ return unreserveMemoryMethod;
+ }
+
JavaVM* vm_;
jobject jListenerGlobalRef_;
- const jmethodID jReserveMethod_;
- const jmethodID jUnreserveMethod_;
std::atomic_int64_t usedBytes_{0L};
std::atomic_int64_t peakBytes_{0L};
};
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 4e1d1f09fd..512adf9e62 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -42,10 +42,6 @@
using namespace gluten;
namespace {
-jclass javaReservationListenerClass;
-
-jmethodID reserveMemoryMethod;
-jmethodID unreserveMemoryMethod;
jclass byteArrayClass;
@@ -238,14 +234,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
nativeColumnarToRowInfoConstructor = getMethodIdOrError(env,
nativeColumnarToRowInfoClass, "<init>", "([I[IJ)V");
- javaReservationListenerClass = createGlobalClassReference(
- env,
- "Lorg/apache/gluten/memory/listener/"
- "ReservationListener;");
-
- reserveMemoryMethod = getMethodIdOrError(env, javaReservationListenerClass,
"reserve", "(J)J");
- unreserveMemoryMethod = getMethodIdOrError(env,
javaReservationListenerClass, "unreserve", "(J)J");
-
shuffleReaderMetricsClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/ShuffleReaderMetrics;");
shuffleReaderMetricsSetDecompressTime =
@@ -316,8 +304,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap
auto backendType = jStringToCString(env, jBackendType);
auto safeArray = getByteArrayElementsSafe(env, sessionConf);
auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
- std::unique_ptr<AllocationListener> listener =
- std::make_unique<SparkAllocationListener>(vm, jListener,
reserveMemoryMethod, unreserveMemoryMethod);
+ std::unique_ptr<AllocationListener> listener =
std::make_unique<SparkAllocationListener>(vm, jListener);
bool backtrace = sparkConf.at(kBacktraceAllocation) == "true";
if (backtrace) {
listener =
std::make_unique<BacktraceAllocationListener>(std::move(listener));
diff --git a/cpp/core/memory/MemoryAllocator.cc
b/cpp/core/memory/MemoryAllocator.cc
index 0fd6489d02..1bd16216dc 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -110,11 +110,12 @@ void ListenableMemoryAllocator::updateUsage(int64_t size)
{
usedBytes_ += size;
while (true) {
int64_t savedPeakBytes = peakBytes_;
- if (usedBytes_ <= savedPeakBytes) {
+ int64_t savedUsedBytes = usedBytes_;
+ if (savedUsedBytes <= savedPeakBytes) {
break;
}
// usedBytes_ > savedPeakBytes, update peak
- if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
+ if (peakBytes_.compare_exchange_weak(savedPeakBytes, savedUsedBytes)) {
break;
}
}
diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
index 4804258caf..81be604eaf 100644
--- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
+++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
@@ -34,10 +34,10 @@
#include <chrono>
#include "benchmarks/common/BenchmarkUtils.h"
+#include "compute/VeloxBackend.h"
#include "compute/VeloxRuntime.h"
#include "memory/ArrowMemoryPool.h"
#include "memory/ColumnarBatch.h"
-#include "memory/VeloxMemoryManager.h"
#include "utils/Macros.h"
#include "utils/TestUtils.h"
#include "utils/VeloxArrowUtils.h"
@@ -251,7 +251,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark :
public GoogleBenchmar
auto fileName = "velox_parquet_write.parquet";
auto memoryManager = getDefaultMemoryManager();
- auto runtime = Runtime::create(kVeloxBackendKind, memoryManager.get());
+ auto runtime = Runtime::create(kVeloxBackendKind, memoryManager);
auto veloxPool = memoryManager->getAggregateMemoryPool();
for (auto _ : state) {
diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
index 779c9e745a..6790e9235f 100644
--- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc
+++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc
@@ -40,7 +40,7 @@ std::unordered_map<std::string, std::string> defaultConf() {
}
void initVeloxBackend(std::unordered_map<std::string, std::string>& conf) {
- gluten::VeloxBackend::create(conf);
+ gluten::VeloxBackend::create(AllocationListener::noop(), conf);
}
void initVeloxBackend() {
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index 1991d6902c..44313fbb39 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -65,7 +65,7 @@ namespace gluten {
namespace {
MemoryManager* veloxMemoryManagerFactory(const std::string& kind,
std::unique_ptr<AllocationListener> listener) {
- return new VeloxMemoryManager(kind, std::move(listener));
+ return new VeloxMemoryManager(kind, std::move(listener),
*VeloxBackend::get()->getBackendConf());
}
void veloxMemoryManagerReleaser(MemoryManager* memoryManager) {
@@ -86,10 +86,14 @@ void veloxRuntimeReleaser(Runtime* runtime) {
}
} // namespace
-void VeloxBackend::init(const std::unordered_map<std::string, std::string>&
conf) {
+void VeloxBackend::init(
+ std::unique_ptr<AllocationListener> listener,
+ const std::unordered_map<std::string, std::string>& conf) {
backendConf_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(conf));
+ globalMemoryManager_ =
std::make_unique<VeloxMemoryManager>(kVeloxBackendKind, std::move(listener),
*backendConf_);
+
// Register factories.
MemoryManager::registerFactory(kVeloxBackendKind, veloxMemoryManagerFactory,
veloxMemoryManagerReleaser);
Runtime::registerFactory(kVeloxBackendKind, veloxRuntimeFactory,
veloxRuntimeReleaser);
@@ -175,7 +179,10 @@ void VeloxBackend::init(const
std::unordered_map<std::string, std::string>& conf
initUdf();
- // Initialize the global memory manager for current process.
+ // Initialize Velox-side memory manager for current process. The memory
manager
+ // will be used during spill calls so we don't track it with Spark off-heap
memory instead
+ // we rely on overhead memory. If we track it with off-heap memory,
recursive reservations from
+ // Spark off-heap memory pool will be conducted to cause unexpected OOMs.
auto sparkOverhead = backendConf_->get<int64_t>(kSparkOverheadMemory);
int64_t memoryManagerCapacity;
if (sparkOverhead.hasValue()) {
@@ -186,7 +193,7 @@ void VeloxBackend::init(const
std::unordered_map<std::string, std::string>& conf
memoryManagerCapacity = facebook::velox::memory::kMaxMemory;
}
LOG(INFO) << "Setting global Velox memory manager with capacity: " <<
memoryManagerCapacity;
- facebook::velox::memory::MemoryManager::initialize({.allocatorCapacity =
memoryManagerCapacity});
+ facebook::velox::memory::initializeMemoryManager({.allocatorCapacity =
memoryManagerCapacity});
}
facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache()
const {
@@ -306,8 +313,10 @@ void VeloxBackend::initUdf() {
std::unique_ptr<VeloxBackend> VeloxBackend::instance_ = nullptr;
-void VeloxBackend::create(const std::unordered_map<std::string, std::string>&
conf) {
- instance_ = std::unique_ptr<VeloxBackend>(new VeloxBackend(conf));
+void VeloxBackend::create(
+ std::unique_ptr<AllocationListener> listener,
+ const std::unordered_map<std::string, std::string>& conf) {
+ instance_ = std::unique_ptr<VeloxBackend>(new
VeloxBackend(std::move(listener), conf));
}
VeloxBackend* VeloxBackend::get() {
@@ -317,5 +326,4 @@ VeloxBackend* VeloxBackend::get() {
}
return instance_.get();
}
-
} // namespace gluten
diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h
index e00a8868d1..86c77d4508 100644
--- a/cpp/velox/compute/VeloxBackend.h
+++ b/cpp/velox/compute/VeloxBackend.h
@@ -26,10 +26,12 @@
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/config/Config.h"
-#include "velox/common/memory/MemoryPool.h"
#include "velox/common/memory/MmapAllocator.h"
+#include "memory/VeloxMemoryManager.h"
+
namespace gluten {
+
// This kind string must be same with VeloxBackend#name in java side.
inline static const std::string kVeloxBackendKind{"velox"};
/// As a static instance in per executor, initialized at executor startup.
@@ -49,7 +51,9 @@ class VeloxBackend {
}
}
- static void create(const std::unordered_map<std::string, std::string>& conf);
+ static void create(
+ std::unique_ptr<AllocationListener> listener,
+ const std::unordered_map<std::string, std::string>& conf);
static VeloxBackend* get();
@@ -59,19 +63,26 @@ class VeloxBackend {
return backendConf_;
}
+ VeloxMemoryManager* getGlobalMemoryManager() const {
+ return globalMemoryManager_.get();
+ }
+
void tearDown() {
// Destruct IOThreadPoolExecutor will join all threads.
// On threads exit, thread local variables can be constructed with
referencing global variables.
// So, we need to destruct IOThreadPoolExecutor and stop the threads
before global variables get destructed.
ioExecutor_.reset();
+ globalMemoryManager_.reset();
}
private:
- explicit VeloxBackend(const std::unordered_map<std::string, std::string>&
conf) {
- init(conf);
+ explicit VeloxBackend(
+ std::unique_ptr<AllocationListener> listener,
+ const std::unordered_map<std::string, std::string>& conf) {
+ init(std::move(listener), conf);
}
- void init(const std::unordered_map<std::string, std::string>& conf);
+ void init(std::unique_ptr<AllocationListener> listener, const
std::unordered_map<std::string, std::string>& conf);
void initCache();
void initConnector();
void initUdf();
@@ -84,6 +95,8 @@ class VeloxBackend {
static std::unique_ptr<VeloxBackend> instance_;
+ // A global Velox memory manager for the current process.
+ std::unique_ptr<VeloxMemoryManager> globalMemoryManager_;
// Instance of AsyncDataCache used for all large allocations.
std::shared_ptr<facebook::velox::cache::AsyncDataCache> asyncDataCache_;
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index d29f5b0f7b..879ade9dc7 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -86,11 +86,19 @@ void JNI_OnUnload(JavaVM* vm, void*) {
JNIEXPORT void JNICALL
Java_org_apache_gluten_init_NativeBackendInitializer_initialize( // NOLINT
JNIEnv* env,
jclass,
+ jobject jListener,
jbyteArray conf) {
JNI_METHOD_START
+ JavaVM* vm;
+ if (env->GetJavaVM(&vm) != JNI_OK) {
+ throw GlutenException("Unable to get JavaVM instance");
+ }
auto safeArray = getByteArrayElementsSafe(env, conf);
+ // Create a global allocation listener that reserves global off-heap memory
from Java-side GlobalOffHeapMemory utility
+ // class.
+ std::unique_ptr<AllocationListener> listener =
std::make_unique<SparkAllocationListener>(vm, jListener);
auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
- VeloxBackend::create(sparkConf);
+ VeloxBackend::create(std::move(listener), sparkConf);
JNI_METHOD_END()
}
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 69919a38d4..3aeb50e451 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -20,11 +20,12 @@
#include <jemalloc/jemalloc.h>
#endif
+#include "compute/VeloxBackend.h"
+
#include "velox/common/memory/MallocAllocator.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/exec/MemoryReclaimer.h"
-#include "compute/VeloxBackend.h"
#include "config/VeloxConfig.h"
#include "memory/ArrowMemoryPool.h"
#include "utils/Exception.h"
@@ -35,15 +36,22 @@ namespace gluten {
using namespace facebook;
-namespace {
+std::unordered_map<std::string, std::string> getExtraArbitratorConfigs(
+ const facebook::velox::config::ConfigBase& backendConf) {
+ auto reservationBlockSize =
+ backendConf.get<uint64_t>(kMemoryReservationBlockSize,
kMemoryReservationBlockSizeDefault);
+ auto memInitCapacity = backendConf.get<uint64_t>(kVeloxMemInitCapacity,
kVeloxMemInitCapacityDefault);
+ auto memReclaimMaxWaitMs =
backendConf.get<uint64_t>(kVeloxMemReclaimMaxWaitMs,
kVeloxMemReclaimMaxWaitMsDefault);
-static constexpr std::string_view
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
-static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
-static constexpr std::string_view
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
-static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
-static constexpr std::string_view
kMemoryReclaimMaxWaitMs{"memory-reclaim-max-wait-time"};
-static constexpr std::string_view kDefaultMemoryReclaimMaxWaitMs{"3600000ms"};
+ std::unordered_map<std::string, std::string> extraArbitratorConfigs;
+ extraArbitratorConfigs[std::string(kMemoryPoolInitialCapacity)] =
folly::to<std::string>(memInitCapacity) + "B";
+ extraArbitratorConfigs[std::string(kMemoryPoolTransferCapacity)] =
folly::to<std::string>(reservationBlockSize) + "B";
+ extraArbitratorConfigs[std::string(kMemoryReclaimMaxWaitMs)] =
folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
+ return extraArbitratorConfigs;
+}
+
+namespace {
template <typename T>
T getConfig(
const std::unordered_map<std::string, std::string>& configs,
@@ -58,7 +66,6 @@ T getConfig(
}
return defaultValue;
}
-} // namespace
/// We assume in a single Spark task. No thread-safety should be guaranteed.
class ListenableArbitrator : public velox::memory::MemoryArbitrator {
@@ -187,49 +194,34 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
std::unordered_map<velox::memory::MemoryPool*,
std::weak_ptr<velox::memory::MemoryPool>> candidates_;
};
-class ArbitratorFactoryRegister {
- public:
- explicit ArbitratorFactoryRegister(gluten::AllocationListener* listener) :
listener_(listener) {
- static std::atomic_uint32_t id{0UL};
- kind_ = "GLUTEN_ARBITRATOR_FACTORY_" + std::to_string(id++);
- velox::memory::MemoryArbitrator::registerFactory(
- kind_,
- [this](
- const velox::memory::MemoryArbitrator::Config& config) ->
std::unique_ptr<velox::memory::MemoryArbitrator> {
- return std::make_unique<ListenableArbitrator>(config, listener_);
- });
- }
-
- virtual ~ArbitratorFactoryRegister() {
- velox::memory::MemoryArbitrator::unregisterFactory(kind_);
- }
+} // namespace
- const std::string& getKind() const {
- return kind_;
- }
+ArbitratorFactoryRegister::ArbitratorFactoryRegister(gluten::AllocationListener*
listener) : listener_(listener) {
+ static std::atomic_uint32_t id{0UL};
+ kind_ = "GLUTEN_ARBITRATOR_FACTORY_" + std::to_string(id++);
+ velox::memory::MemoryArbitrator::registerFactory(
+ kind_,
+ [this](
+ const velox::memory::MemoryArbitrator::Config& config) ->
std::unique_ptr<velox::memory::MemoryArbitrator> {
+ return std::make_unique<ListenableArbitrator>(config, listener_);
+ });
+}
- private:
- std::string kind_;
- gluten::AllocationListener* listener_;
-};
+ArbitratorFactoryRegister::~ArbitratorFactoryRegister() {
+ velox::memory::MemoryArbitrator::unregisterFactory(kind_);
+}
-VeloxMemoryManager::VeloxMemoryManager(const std::string& kind,
std::unique_ptr<AllocationListener> listener)
+VeloxMemoryManager::VeloxMemoryManager(
+ const std::string& kind,
+ std::unique_ptr<AllocationListener> listener,
+ const facebook::velox::config::ConfigBase& backendConf)
: MemoryManager(kind), listener_(std::move(listener)) {
- auto reservationBlockSize =
VeloxBackend::get()->getBackendConf()->get<uint64_t>(
- kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
- auto memInitCapacity =
-
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity,
kVeloxMemInitCapacityDefault);
- auto memReclaimMaxWaitMs =
-
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs,
kVeloxMemReclaimMaxWaitMsDefault);
+ auto reservationBlockSize =
+ backendConf.get<uint64_t>(kMemoryReservationBlockSize,
kMemoryReservationBlockSizeDefault);
blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(),
reservationBlockSize);
listenableAlloc_ =
std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(),
blockListener_.get());
arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
- std::unordered_map<std::string, std::string> extraArbitratorConfigs;
- extraArbitratorConfigs[std::string(kMemoryPoolInitialCapacity)] =
folly::to<std::string>(memInitCapacity) + "B";
- extraArbitratorConfigs[std::string(kMemoryPoolTransferCapacity)] =
folly::to<std::string>(reservationBlockSize) + "B";
- extraArbitratorConfigs[std::string(kMemoryReclaimMaxWaitMs)] =
folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
-
ArbitratorFactoryRegister afr(listener_.get());
velox::memory::MemoryManagerOptions mmOptions{
.alignment = velox::memory::MemoryAllocator::kMaxAlignment,
@@ -239,7 +231,7 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string&
kind, std::unique_ptr<
.coreOnAllocationFailureEnabled = false,
.allocatorCapacity = velox::memory::kMaxMemory,
.arbitratorKind = afr.getKind(),
- .extraArbitratorConfigs = extraArbitratorConfigs};
+ .extraArbitratorConfigs = getExtraArbitratorConfigs(backendConf)};
veloxMemoryManager_ =
std::make_unique<velox::memory::MemoryManager>(mmOptions);
veloxAggregatePool_ = veloxMemoryManager_->addRootPool(
@@ -405,4 +397,12 @@ VeloxMemoryManager::~VeloxMemoryManager() {
#endif
}
+VeloxMemoryManager* getDefaultMemoryManager() {
+ return VeloxBackend::get()->getGlobalMemoryManager();
+}
+
+std::shared_ptr<velox::memory::MemoryPool> defaultLeafVeloxMemoryPool() {
+ return getDefaultMemoryManager()->getLeafMemoryPool();
+}
+
} // namespace gluten
diff --git a/cpp/velox/memory/VeloxMemoryManager.h
b/cpp/velox/memory/VeloxMemoryManager.h
index 638a4f3b3e..c2d79a9131 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -17,19 +17,48 @@
#pragma once
-#include "compute/VeloxBackend.h"
#include "memory/AllocationListener.h"
#include "memory/MemoryAllocator.h"
#include "memory/MemoryManager.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/memory/MemoryPool.h"
+#include <velox/common/config/Config.h>
+
namespace gluten {
+constexpr std::string_view
kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
+constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
+constexpr std::string_view
kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
+constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};
+constexpr std::string_view
kMemoryReclaimMaxWaitMs{"memory-reclaim-max-wait-time"};
+constexpr std::string_view kDefaultMemoryReclaimMaxWaitMs{"3600000ms"};
+
+std::unordered_map<std::string, std::string> getExtraArbitratorConfigs(
+ const facebook::velox::config::ConfigBase& backendConf);
+
+class ArbitratorFactoryRegister {
+ public:
+ explicit ArbitratorFactoryRegister(gluten::AllocationListener* listener);
+
+ virtual ~ArbitratorFactoryRegister();
+
+ const std::string& getKind() const {
+ return kind_;
+ }
+
+ private:
+ std::string kind_;
+ gluten::AllocationListener* listener_;
+};
+
// Make sure the class is thread safe
class VeloxMemoryManager final : public MemoryManager {
public:
- VeloxMemoryManager(const std::string& kind,
std::unique_ptr<AllocationListener> listener);
+ VeloxMemoryManager(
+ const std::string& kind,
+ std::unique_ptr<AllocationListener> listener,
+ const facebook::velox::config::ConfigBase& backendConf);
~VeloxMemoryManager() override;
VeloxMemoryManager(const VeloxMemoryManager&) = delete;
@@ -87,15 +116,8 @@ class VeloxMemoryManager final : public MemoryManager {
std::vector<std::shared_ptr<facebook::velox::memory::MemoryPool>>
heldVeloxPools_;
};
-/// Not tracked by Spark and should only used in test or validation.
-inline std::shared_ptr<gluten::VeloxMemoryManager> getDefaultMemoryManager() {
- static auto memoryManager =
- std::make_shared<gluten::VeloxMemoryManager>(gluten::kVeloxBackendKind,
gluten::AllocationListener::noop());
- return memoryManager;
-}
+VeloxMemoryManager* getDefaultMemoryManager();
-inline std::shared_ptr<facebook::velox::memory::MemoryPool>
defaultLeafVeloxMemoryPool() {
- return getDefaultMemoryManager()->getLeafMemoryPool();
-}
+std::shared_ptr<facebook::velox::memory::MemoryPool>
defaultLeafVeloxMemoryPool();
} // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index b7fbe9a47f..909d00e5e8 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -16,6 +16,9 @@
*/
#include "SubstraitToVeloxPlan.h"
+
+#include "utils/StringUtil.h"
+
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
#include "operators/plannodes/RowVectorStream.h"
@@ -498,7 +501,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
std::string makeUuid() {
- return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
+ return generateUuid();
}
std::string compressionFileNameSuffix(common::CompressionKind kind) {
diff --git a/cpp/velox/tests/BufferOutputStreamTest.cc
b/cpp/velox/tests/BufferOutputStreamTest.cc
index b9ea62fd7b..b4eb2bceab 100644
--- a/cpp/velox/tests/BufferOutputStreamTest.cc
+++ b/cpp/velox/tests/BufferOutputStreamTest.cc
@@ -29,7 +29,7 @@ class BufferOutputStreamTest : public ::testing::Test, public
test::VectorTestBa
protected:
// Velox requires the mem manager to be instanced.
static void SetUpTestCase() {
- VeloxBackend::create({});
+ VeloxBackend::create(AllocationListener::noop(), {});
memory::MemoryManager::testingSetInstance({});
}
diff --git a/cpp/velox/tests/MemoryManagerTest.cc
b/cpp/velox/tests/MemoryManagerTest.cc
index 85df9eed49..8dd41aec7d 100644
--- a/cpp/velox/tests/MemoryManagerTest.cc
+++ b/cpp/velox/tests/MemoryManagerTest.cc
@@ -50,11 +50,12 @@ class MemoryManagerTest : public ::testing::Test {
std::unordered_map<std::string, std::string> conf = {
{kMemoryReservationBlockSize,
std::to_string(kMemoryReservationBlockSizeDefault)},
{kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
- gluten::VeloxBackend::create(conf);
+ gluten::VeloxBackend::create(AllocationListener::noop(), conf);
}
void SetUp() override {
- vmm_ = std::make_unique<VeloxMemoryManager>(gluten::kVeloxBackendKind,
std::make_unique<MockAllocationListener>());
+ vmm_ = std::make_unique<VeloxMemoryManager>(
+ gluten::kVeloxBackendKind, std::make_unique<MockAllocationListener>(),
*VeloxBackend::get()->getBackendConf());
listener_ = vmm_->getListener();
allocator_ = vmm_->allocator();
}
@@ -333,11 +334,12 @@ class MultiMemoryManagerTest : public ::testing::Test {
std::unordered_map<std::string, std::string> conf = {
{kMemoryReservationBlockSize,
std::to_string(kMemoryReservationBlockSizeDefault)},
{kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
- gluten::VeloxBackend::create(conf);
+ gluten::VeloxBackend::create(AllocationListener::noop(), conf);
}
std::unique_ptr<VeloxMemoryManager>
newVeloxMemoryManager(std::unique_ptr<AllocationListener> listener) {
- return std::make_unique<VeloxMemoryManager>(gluten::kVeloxBackendKind,
std::move(listener));
+ return std::make_unique<VeloxMemoryManager>(
+ gluten::kVeloxBackendKind, std::move(listener),
*VeloxBackend::get()->getBackendConf());
}
};
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index b4944d9205..212b0201ed 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -144,7 +144,7 @@ TEST(TestRuntime, CreateRuntime) {
}
TEST(TestRuntime, CreateVeloxRuntime) {
- VeloxBackend::create({});
+ VeloxBackend::create(AllocationListener::noop(), {});
auto mm = MemoryManager::create(kVeloxBackendKind,
AllocationListener::noop());
auto runtime = Runtime::create(kVeloxBackendKind, mm);
ASSERT_EQ(typeid(*runtime), typeid(VeloxRuntime));
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
b/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
index 232ec5d7a8..c0ae2199c5 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
@@ -17,6 +17,7 @@
package org.apache.gluten.init;
import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.memory.listener.ReservationListener;
import org.apache.gluten.utils.ConfigUtil;
import org.apache.spark.util.SparkShutdownManagerUtil;
@@ -52,12 +53,11 @@ public final class NativeBackendInitializer {
// In local mode, NativeBackendInitializer#initializeBackend will be invoked
twice in same
// thread, driver first then executor, initialized flag ensure only invoke
initializeBackend once,
// so there are no race condition here.
- public void initialize(scala.collection.Map<String, String> conf) {
+ public void initialize(ReservationListener rl, scala.collection.Map<String,
String> conf) {
if (!initialized.compareAndSet(false, true)) {
- // Already called.
- return;
+ throw new IllegalStateException("Already initialized");
}
- initialize0(conf);
+ initialize0(rl, conf);
SparkShutdownManagerUtil.addHook(
() -> {
shutdown();
@@ -65,17 +65,17 @@ public final class NativeBackendInitializer {
});
}
- private void initialize0(scala.collection.Map<String, String> conf) {
+ private void initialize0(ReservationListener rl,
scala.collection.Map<String, String> conf) {
try {
Map<String, String> nativeConfMap =
GlutenConfig.getNativeBackendConf(backendName, conf);
- initialize(ConfigUtil.serialize(nativeConfMap));
+ initialize(rl, ConfigUtil.serialize(nativeConfMap));
} catch (Exception e) {
LOG.error("Failed to call native backend's initialize method", e);
throw e;
}
}
- private native void initialize(byte[] configPlan);
+ private native void initialize(ReservationListener rl, byte[] configPlan);
private native void shutdown();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]