This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 76ba7333616 Fix concurrency problem in Memory Control (#12984)
76ba7333616 is described below

commit 76ba73336161ddadb87983d0f9b7b02611c5643c
Author: ZhangHongYin <[email protected]>
AuthorDate: Mon Jul 22 16:26:53 2024 +0800

    Fix concurrency problem in Memory Control (#12984)
    
    * fix memory concurrency problem
    
    * spotless
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    Co-authored-by: OneSizeFitQuorum <[email protected]>
---
 .../schemaregion/impl/SchemaRegionMemoryImpl.java    |  3 +--
 .../schemaregion/impl/SchemaRegionPBTreeImpl.java    |  3 +--
 .../db/storageengine/rescon/memory/SystemInfo.java   | 20 +++++++++++---------
 3 files changed, 13 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 25eaa8fe211..c6c705c8ea6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -198,8 +198,7 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
 
     if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
       long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax();
-      if (!SystemInfo.getInstance()
-          
.addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax()))
 {
+      if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) {
         throw new MetadataException(
             "Total allocated memory for direct buffer will be "
                 + (SystemInfo.getInstance().getDirectBufferMemoryCost() + 
memCost)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index e57620c37e5..39ea98515e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -194,8 +194,7 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
 
     if 
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
 {
       long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax();
-      if (!SystemInfo.getInstance()
-          
.addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax()))
 {
+      if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) {
         throw new MetadataException(
             "Total allocated memory for direct buffer will be "
                 + (SystemInfo.getInstance().getDirectBufferMemoryCost() + 
memCost)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 4b0f56f858d..0458320af05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -196,15 +197,16 @@ public class SystemInfo {
   }
 
   public boolean addDirectBufferMemoryCost(long size) {
-    while (true) {
-      long memCost = directBufferMemoryCost.get();
-      if (memCost + size > totalDirectBufferMemorySizeLimit) {
-        return false;
-      }
-      if (directBufferMemoryCost.compareAndSet(memCost, memCost + size)) {
-        return true;
-      }
-    }
+    AtomicBoolean result = new AtomicBoolean(false);
+    directBufferMemoryCost.updateAndGet(
+        memCost -> {
+          if (memCost + size > totalDirectBufferMemorySizeLimit) {
+            return memCost;
+          }
+          result.set(true);
+          return memCost + size;
+        });
+    return result.get();
   }
 
   public void decreaseDirectBufferMemoryCost(long size) {

Reply via email to