This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 76ba7333616 Fix concurrency problem in Memory Control (#12984)
76ba7333616 is described below
commit 76ba73336161ddadb87983d0f9b7b02611c5643c
Author: ZhangHongYin <[email protected]>
AuthorDate: Mon Jul 22 16:26:53 2024 +0800
Fix concurrency problem in Memory Control (#12984)
* fix memory concurrency problem
* spotless
Signed-off-by: OneSizeFitQuorum <[email protected]>
Co-authored-by: OneSizeFitQuorum <[email protected]>
---
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 3 +--
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 3 +--
.../db/storageengine/rescon/memory/SystemInfo.java | 20 +++++++++++---------
3 files changed, 13 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 25eaa8fe211..c6c705c8ea6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -198,8 +198,7 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax();
- if (!SystemInfo.getInstance()
-
.addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax()))
{
+ if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) {
throw new MetadataException(
"Total allocated memory for direct buffer will be "
+ (SystemInfo.getInstance().getDirectBufferMemoryCost() +
memCost)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index e57620c37e5..39ea98515e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -194,8 +194,7 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
if
(config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax();
- if (!SystemInfo.getInstance()
-
.addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax()))
{
+ if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) {
throw new MetadataException(
"Total allocated memory for direct buffer will be "
+ (SystemInfo.getInstance().getDirectBufferMemoryCost() +
memCost)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 4b0f56f858d..0458320af05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -196,15 +197,16 @@ public class SystemInfo {
}
public boolean addDirectBufferMemoryCost(long size) {
- while (true) {
- long memCost = directBufferMemoryCost.get();
- if (memCost + size > totalDirectBufferMemorySizeLimit) {
- return false;
- }
- if (directBufferMemoryCost.compareAndSet(memCost, memCost + size)) {
- return true;
- }
- }
+ AtomicBoolean result = new AtomicBoolean(false);
+ directBufferMemoryCost.updateAndGet(
+ memCost -> {
+ if (memCost + size > totalDirectBufferMemorySizeLimit) {
+ return memCost;
+ }
+ result.set(true);
+ return memCost + size;
+ });
+ return result.get();
}
public void decreaseDirectBufferMemoryCost(long size) {