This is an automated email from the ASF dual-hosted git repository. spricoder pushed a commit to branch feature/optimize_memory in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a4a0e2948a86534dbdca6a255a5063e8366ddcde Author: spricoder <[email protected]> AuthorDate: Mon Jul 22 09:41:17 2024 +0800 fix memory concurrency problem --- .../schemaregion/impl/SchemaRegionMemoryImpl.java | 3 +-- .../schemaregion/impl/SchemaRegionPBTreeImpl.java | 3 +-- .../iotdb/db/storageengine/rescon/memory/SystemInfo.java | 15 ++++++++------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index e9173cc5888..6a0b9d197df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -193,8 +193,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax(); - if (!SystemInfo.getInstance() - .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) { + if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) { throw new MetadataException( "Total allocated memory for direct buffer will be " + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index 4dbfaf5c3bf..9c7a1b5ace8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -190,8 +190,7 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax(); - if (!SystemInfo.getInstance() - .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) { + if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) { throw new MetadataException( "Total allocated memory for direct buffer will be " + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index 4b0f56f858d..a64a4fe24bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -196,15 +197,15 @@ public class SystemInfo { } public boolean addDirectBufferMemoryCost(long size) { - while (true) { - long memCost = directBufferMemoryCost.get(); + AtomicBoolean result = new AtomicBoolean(false); + directBufferMemoryCost.updateAndGet(memCost -> { if (memCost + size > totalDirectBufferMemorySizeLimit) { - return false; + return memCost; } - if (directBufferMemoryCost.compareAndSet(memCost, memCost + size)) { - return true; - } - } + result.set(true); + return memCost + size; + }); + return result.get(); } public void decreaseDirectBufferMemoryCost(long size) {
