This is an automated email from the ASF dual-hosted git repository.
felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2866ff732 [VL] Add thread_safe to several VeloxRuntime classes (#6526)
2866ff732 is described below
commit 2866ff7329cdfa763b497e4c3c19cc845aee4c84
Author: BInwei Yang <[email protected]>
AuthorDate: Sun Jul 21 23:37:12 2024 -0700
[VL] Add thread_safe to several VeloxRuntime classes (#6526)
VeloxRuntime is shared by many threads, like task threads or parquet
writter threads. We must make sure the objects hold by VeloxRuntime are thread
safe.
---
cpp/core/jni/JniCommon.h | 17 +++++++++++------
cpp/core/memory/AllocationListener.h | 17 +++++++++++++----
cpp/core/memory/MemoryAllocator.cc | 4 ++--
cpp/core/memory/MemoryAllocator.h | 9 +++++----
cpp/velox/memory/VeloxMemoryManager.h | 1 +
5 files changed, 32 insertions(+), 16 deletions(-)
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index d5c9f2b3b..8f8002b2c 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -362,6 +362,10 @@ static inline gluten::CompressionMode
getCompressionMode(JNIEnv* env, jstring co
}
}
+/*
+NOTE: the class must be thread safe
+ */
+
class SparkAllocationListener final : public gluten::AllocationListener {
public:
SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID
jReserveMethod, jmethodID jUnreserveMethod)
@@ -399,8 +403,9 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
checkException(env);
}
- bytesReserved_ += size;
- maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_);
+ // atomic operation is enough here, no need to use mutex
+ bytesReserved_.fetch_add(size);
+ maxBytesReserved_.store(std::max(bytesReserved_.load(),
maxBytesReserved_.load()));
}
int64_t currentBytes() override {
@@ -414,10 +419,10 @@ class SparkAllocationListener final : public
gluten::AllocationListener {
private:
JavaVM* vm_;
jobject jListenerGlobalRef_;
- jmethodID jReserveMethod_;
- jmethodID jUnreserveMethod_;
- int64_t bytesReserved_ = 0L;
- int64_t maxBytesReserved_ = 0L;
+ const jmethodID jReserveMethod_;
+ const jmethodID jUnreserveMethod_;
+ std::atomic_int64_t bytesReserved_{0L};
+ std::atomic_int64_t maxBytesReserved_{0L};
};
class BacktraceAllocationListener final : public gluten::AllocationListener {
diff --git a/cpp/core/memory/AllocationListener.h
b/cpp/core/memory/AllocationListener.h
index d43a621de..695552cef 100644
--- a/cpp/core/memory/AllocationListener.h
+++ b/cpp/core/memory/AllocationListener.h
@@ -19,6 +19,7 @@
#include <algorithm>
#include <memory>
+#include <mutex>
namespace gluten {
@@ -46,6 +47,7 @@ class AllocationListener {
};
/// Memory changes will be round to specified block size which aim to decrease
delegated listener calls.
+// The class must be thread safe
class BlockAllocationListener final : public AllocationListener {
public:
BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
@@ -55,19 +57,24 @@ class BlockAllocationListener final : public
AllocationListener {
if (diff == 0) {
return;
}
+ std::unique_lock<std::mutex> guard{mutex_};
if (diff > 0) {
if (reservationBytes_ - usedBytes_ < diff) {
auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
- delegated_->allocationChanged(roundSize);
reservationBytes_ += roundSize;
peakBytes_ = std::max(peakBytes_, reservationBytes_);
+ guard.unlock();
+ // unnecessary to lock the delegated listener, assume it's thread safe
+ delegated_->allocationChanged(roundSize);
}
usedBytes_ += diff;
} else {
usedBytes_ += diff;
auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ *
blockSize_;
- delegated_->allocationChanged(-unreservedSize);
reservationBytes_ -= unreservedSize;
+ guard.unlock();
+ // unnecessary to lock the delegated listener
+ delegated_->allocationChanged(-unreservedSize);
}
}
@@ -80,11 +87,13 @@ class BlockAllocationListener final : public
AllocationListener {
}
private:
- AllocationListener* delegated_;
- uint64_t blockSize_{0L};
+ AllocationListener* const delegated_;
+ const uint64_t blockSize_;
uint64_t usedBytes_{0L};
uint64_t peakBytes_{0L};
uint64_t reservationBytes_{0L};
+
+ mutable std::mutex mutex_;
};
} // namespace gluten
diff --git a/cpp/core/memory/MemoryAllocator.cc
b/cpp/core/memory/MemoryAllocator.cc
index ac869219d..01818636a 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -92,8 +92,8 @@ int64_t ListenableMemoryAllocator::peakBytes() const {
void ListenableMemoryAllocator::updateUsage(int64_t size) {
listener_->allocationChanged(size);
- usedBytes_ += size;
- peakBytes_ = std::max(peakBytes_, usedBytes_);
+ usedBytes_.fetch_add(size);
+ peakBytes_.store(std::max(peakBytes_.load(), usedBytes_.load()));
}
bool StdMemoryAllocator::allocate(int64_t size, void** out) {
diff --git a/cpp/core/memory/MemoryAllocator.h
b/cpp/core/memory/MemoryAllocator.h
index bc8f9de18..01271cc94 100644
--- a/cpp/core/memory/MemoryAllocator.h
+++ b/cpp/core/memory/MemoryAllocator.h
@@ -45,6 +45,7 @@ class MemoryAllocator {
virtual int64_t peakBytes() const = 0;
};
+// The class must be thread safe
class ListenableMemoryAllocator final : public MemoryAllocator {
public:
explicit ListenableMemoryAllocator(MemoryAllocator* delegated,
AllocationListener* listener)
@@ -69,10 +70,10 @@ class ListenableMemoryAllocator final : public
MemoryAllocator {
private:
void updateUsage(int64_t size);
- MemoryAllocator* delegated_;
- AllocationListener* listener_;
- uint64_t usedBytes_{0L};
- uint64_t peakBytes_{0L};
+ MemoryAllocator* const delegated_;
+ AllocationListener* const listener_;
+ std::atomic_int64_t usedBytes_{0L};
+ std::atomic_int64_t peakBytes_{0L};
};
class StdMemoryAllocator final : public MemoryAllocator {
diff --git a/cpp/velox/memory/VeloxMemoryManager.h
b/cpp/velox/memory/VeloxMemoryManager.h
index 3607ca793..7a96b87e1 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -25,6 +25,7 @@
namespace gluten {
+// Make sure the class is thread safe
class VeloxMemoryManager final : public MemoryManager {
public:
VeloxMemoryManager(std::unique_ptr<AllocationListener> listener);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]