This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 27ef7a8613b8f894fe308378502208df1ed42609 Author: ZhangHongYin <[email protected]> AuthorDate: Mon Jul 22 16:26:53 2024 +0800 Fix concurrency problem in Memory Control (#12984) * fix memory concurrency problem * spotless Signed-off-by: OneSizeFitQuorum <[email protected]> Co-authored-by: OneSizeFitQuorum <[email protected]> (cherry picked from commit 76ba73336161ddadb87983d0f9b7b02611c5643c) --- .../schemaregion/impl/SchemaRegionMemoryImpl.java | 3 +-- .../schemaregion/impl/SchemaRegionPBTreeImpl.java | 3 +-- .../db/storageengine/rescon/memory/SystemInfo.java | 20 +++++++++++--------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index 63746f0b6b8..75f6c63c656 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -202,8 +202,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax(); - if (!SystemInfo.getInstance() - .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) { + if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) { throw new MetadataException( "Total allocated memory for direct buffer will be " + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index 039be37145d..b790545b5d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -195,8 +195,7 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax(); - if (!SystemInfo.getInstance() - .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) { + if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) { throw new MetadataException( "Total allocated memory for direct buffer will be " + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index 4b0f56f858d..0458320af05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -196,15 +197,16 @@ public class SystemInfo { } public boolean addDirectBufferMemoryCost(long size) { - while (true) { - long memCost = directBufferMemoryCost.get(); - if (memCost + size > totalDirectBufferMemorySizeLimit) { - return false; - } - if (directBufferMemoryCost.compareAndSet(memCost, memCost + size)) { - return true; - } - } + AtomicBoolean result = new AtomicBoolean(false); + directBufferMemoryCost.updateAndGet( + memCost -> { + if (memCost + size > totalDirectBufferMemorySizeLimit) { + return memCost; + } + result.set(true); + return memCost + size; + }); + return result.get(); } public void decreaseDirectBufferMemoryCost(long size) {
