This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2866ff732 [VL] Add thread_safe to several VeloxRuntime classes (#6526)
2866ff732 is described below

commit 2866ff7329cdfa763b497e4c3c19cc845aee4c84
Author: BInwei Yang <[email protected]>
AuthorDate: Sun Jul 21 23:37:12 2024 -0700

    [VL] Add thread_safe to several VeloxRuntime classes (#6526)
    
    VeloxRuntime is shared by many threads, like task threads or parquet 
writter threads. We must make sure the objects hold by VeloxRuntime are thread 
safe.
---
 cpp/core/jni/JniCommon.h              | 17 +++++++++++------
 cpp/core/memory/AllocationListener.h  | 17 +++++++++++++----
 cpp/core/memory/MemoryAllocator.cc    |  4 ++--
 cpp/core/memory/MemoryAllocator.h     |  9 +++++----
 cpp/velox/memory/VeloxMemoryManager.h |  1 +
 5 files changed, 32 insertions(+), 16 deletions(-)

diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index d5c9f2b3b..8f8002b2c 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -362,6 +362,10 @@ static inline gluten::CompressionMode 
getCompressionMode(JNIEnv* env, jstring co
   }
 }
 
+/*
+NOTE: the class must be thread safe
+ */
+
 class SparkAllocationListener final : public gluten::AllocationListener {
  public:
   SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID 
jReserveMethod, jmethodID jUnreserveMethod)
@@ -399,8 +403,9 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
       env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
       checkException(env);
     }
-    bytesReserved_ += size;
-    maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_);
+    // atomic operation is enough here, no need to use mutex
+    bytesReserved_.fetch_add(size);
+    maxBytesReserved_.store(std::max(bytesReserved_.load(), 
maxBytesReserved_.load()));
   }
 
   int64_t currentBytes() override {
@@ -414,10 +419,10 @@ class SparkAllocationListener final : public 
gluten::AllocationListener {
  private:
   JavaVM* vm_;
   jobject jListenerGlobalRef_;
-  jmethodID jReserveMethod_;
-  jmethodID jUnreserveMethod_;
-  int64_t bytesReserved_ = 0L;
-  int64_t maxBytesReserved_ = 0L;
+  const jmethodID jReserveMethod_;
+  const jmethodID jUnreserveMethod_;
+  std::atomic_int64_t bytesReserved_{0L};
+  std::atomic_int64_t maxBytesReserved_{0L};
 };
 
 class BacktraceAllocationListener final : public gluten::AllocationListener {
diff --git a/cpp/core/memory/AllocationListener.h 
b/cpp/core/memory/AllocationListener.h
index d43a621de..695552cef 100644
--- a/cpp/core/memory/AllocationListener.h
+++ b/cpp/core/memory/AllocationListener.h
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <memory>
+#include <mutex>
 
 namespace gluten {
 
@@ -46,6 +47,7 @@ class AllocationListener {
 };
 
 /// Memory changes will be round to specified block size which aim to decrease 
delegated listener calls.
+// The class must be thread safe
 class BlockAllocationListener final : public AllocationListener {
  public:
   BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
@@ -55,19 +57,24 @@ class BlockAllocationListener final : public 
AllocationListener {
     if (diff == 0) {
       return;
     }
+    std::unique_lock<std::mutex> guard{mutex_};
     if (diff > 0) {
       if (reservationBytes_ - usedBytes_ < diff) {
         auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
-        delegated_->allocationChanged(roundSize);
         reservationBytes_ += roundSize;
         peakBytes_ = std::max(peakBytes_, reservationBytes_);
+        guard.unlock();
+        // unnecessary to lock the delegated listener, assume it's thread safe
+        delegated_->allocationChanged(roundSize);
       }
       usedBytes_ += diff;
     } else {
       usedBytes_ += diff;
       auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * 
blockSize_;
-      delegated_->allocationChanged(-unreservedSize);
       reservationBytes_ -= unreservedSize;
+      guard.unlock();
+      // unnecessary to lock the delegated listener
+      delegated_->allocationChanged(-unreservedSize);
     }
   }
 
@@ -80,11 +87,13 @@ class BlockAllocationListener final : public 
AllocationListener {
   }
 
  private:
-  AllocationListener* delegated_;
-  uint64_t blockSize_{0L};
+  AllocationListener* const delegated_;
+  const uint64_t blockSize_;
   uint64_t usedBytes_{0L};
   uint64_t peakBytes_{0L};
   uint64_t reservationBytes_{0L};
+
+  mutable std::mutex mutex_;
 };
 
 } // namespace gluten
diff --git a/cpp/core/memory/MemoryAllocator.cc 
b/cpp/core/memory/MemoryAllocator.cc
index ac869219d..01818636a 100644
--- a/cpp/core/memory/MemoryAllocator.cc
+++ b/cpp/core/memory/MemoryAllocator.cc
@@ -92,8 +92,8 @@ int64_t ListenableMemoryAllocator::peakBytes() const {
 
 void ListenableMemoryAllocator::updateUsage(int64_t size) {
   listener_->allocationChanged(size);
-  usedBytes_ += size;
-  peakBytes_ = std::max(peakBytes_, usedBytes_);
+  usedBytes_.fetch_add(size);
+  peakBytes_.store(std::max(peakBytes_.load(), usedBytes_.load()));
 }
 
 bool StdMemoryAllocator::allocate(int64_t size, void** out) {
diff --git a/cpp/core/memory/MemoryAllocator.h 
b/cpp/core/memory/MemoryAllocator.h
index bc8f9de18..01271cc94 100644
--- a/cpp/core/memory/MemoryAllocator.h
+++ b/cpp/core/memory/MemoryAllocator.h
@@ -45,6 +45,7 @@ class MemoryAllocator {
   virtual int64_t peakBytes() const = 0;
 };
 
+// The class must be thread safe
 class ListenableMemoryAllocator final : public MemoryAllocator {
  public:
   explicit ListenableMemoryAllocator(MemoryAllocator* delegated, 
AllocationListener* listener)
@@ -69,10 +70,10 @@ class ListenableMemoryAllocator final : public 
MemoryAllocator {
 
  private:
   void updateUsage(int64_t size);
-  MemoryAllocator* delegated_;
-  AllocationListener* listener_;
-  uint64_t usedBytes_{0L};
-  uint64_t peakBytes_{0L};
+  MemoryAllocator* const delegated_;
+  AllocationListener* const listener_;
+  std::atomic_int64_t usedBytes_{0L};
+  std::atomic_int64_t peakBytes_{0L};
 };
 
 class StdMemoryAllocator final : public MemoryAllocator {
diff --git a/cpp/velox/memory/VeloxMemoryManager.h 
b/cpp/velox/memory/VeloxMemoryManager.h
index 3607ca793..7a96b87e1 100644
--- a/cpp/velox/memory/VeloxMemoryManager.h
+++ b/cpp/velox/memory/VeloxMemoryManager.h
@@ -25,6 +25,7 @@
 
 namespace gluten {
 
+// Make sure the class is thread safe
 class VeloxMemoryManager final : public MemoryManager {
  public:
   VeloxMemoryManager(std::unique_ptr<AllocationListener> listener);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to