This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 28face49b8 [IOTDB-4522] Add memory control of queue in multiLeader 
consensus. (#7432)
28face49b8 is described below

commit 28face49b8788d949a73bce858bace2229b30bb3
Author: ZhangHongYin <[email protected]>
AuthorDate: Sat Oct 8 14:20:50 2022 +0800

    [IOTDB-4522] Add memory control of queue in multiLeader consensus. (#7432)
---
 .../common/request/IndexedConsensusRequest.java    |  8 +++
 .../iotdb/consensus/config/MultiLeaderConfig.java  | 32 +++++-----
 .../multileader/MultiLeaderConsensus.java          |  4 ++
 .../multileader/logdispatcher/LogDispatcher.java   | 64 ++++++++++++++-----
 .../logdispatcher/MultiLeaderMemoryManager.java    | 72 ++++++++++++++++++++++
 .../resources/conf/iotdb-datanode.properties       |  8 +--
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 16 ++++-
 .../db/consensus/DataRegionConsensusImpl.java      |  2 +
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 23 +++++--
 .../java/org/apache/iotdb/db/service/DataNode.java |  8 +++
 .../java/org/apache/iotdb/db/service/NewIoTDB.java |  3 +-
 12 files changed, 207 insertions(+), 46 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 1a61cf3a09..6abca549b6 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -33,6 +33,7 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   private final long syncIndex;
   private List<IConsensusRequest> requests;
   private List<ByteBuffer> serializedRequests;
+  private long serializedSize = 0;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> 
requests) {
     this.searchIndex = searchIndex;
@@ -43,6 +44,9 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long 
searchIndex) {
     this.searchIndex = searchIndex;
     this.serializedRequests = serializedRequests;
+    for (ByteBuffer byteBuffer : serializedRequests) {
+      serializedSize += byteBuffer.capacity();
+    }
     this.syncIndex = -1L;
   }
 
@@ -72,6 +76,10 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
     return result;
   }
 
+  public long getSerializedSize() {
+    return serializedSize;
+  }
+
   public long getSearchIndex() {
     return searchIndex;
   }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 2a334b0b84..cd0d58c4ae 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -196,7 +196,6 @@ public class MultiLeaderConfig {
   }
 
   public static class Replication {
-    private final int maxPendingRequestNumPerNode;
     private final int maxRequestPerBatch;
     private final int maxPendingBatch;
     private final int maxWaitingTimeForAccumulatingBatchInMs;
@@ -205,9 +204,9 @@ public class MultiLeaderConfig {
     private final long walThrottleThreshold;
     private final long throttleTimeOutMs;
     private final long checkpointGap;
+    private final Long allocateMemoryForConsensus;
 
     private Replication(
-        int maxPendingRequestNumPerNode,
         int maxRequestPerBatch,
         int maxPendingBatch,
         int maxWaitingTimeForAccumulatingBatchInMs,
@@ -215,8 +214,8 @@ public class MultiLeaderConfig {
         long maxRetryWaitTimeMs,
         long walThrottleThreshold,
         long throttleTimeOutMs,
-        long checkpointGap) {
-      this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
+        long checkpointGap,
+        long allocateMemoryForConsensus) {
       this.maxRequestPerBatch = maxRequestPerBatch;
       this.maxPendingBatch = maxPendingBatch;
       this.maxWaitingTimeForAccumulatingBatchInMs = 
maxWaitingTimeForAccumulatingBatchInMs;
@@ -225,10 +224,7 @@ public class MultiLeaderConfig {
       this.walThrottleThreshold = walThrottleThreshold;
       this.throttleTimeOutMs = throttleTimeOutMs;
       this.checkpointGap = checkpointGap;
-    }
-
-    public int getMaxPendingRequestNumPerNode() {
-      return maxPendingRequestNumPerNode;
+      this.allocateMemoryForConsensus = allocateMemoryForConsensus;
     }
 
     public int getMaxRequestPerBatch() {
@@ -263,12 +259,15 @@ public class MultiLeaderConfig {
       return checkpointGap;
     }
 
+    public Long getAllocateMemoryForConsensus() {
+      return allocateMemoryForConsensus;
+    }
+
     public static Replication.Builder newBuilder() {
       return new Replication.Builder();
     }
 
     public static class Builder {
-      private int maxPendingRequestNumPerNode = 600;
       private int maxRequestPerBatch = 30;
       // (IMPORTANT) Value of this variable should be the same with 
MAX_REQUEST_CACHE_SIZE
       // in DataRegionStateMachine
@@ -279,11 +278,7 @@ public class MultiLeaderConfig {
       private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
       private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
       private long checkpointGap = 500;
-
-      public Replication.Builder setMaxPendingRequestNumPerNode(int 
maxPendingRequestNumPerNode) {
-        this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
-        return this;
-      }
+      private long allocateMemoryForConsensus;
 
       public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch) 
{
         this.maxRequestPerBatch = maxRequestPerBatch;
@@ -321,9 +316,13 @@ public class MultiLeaderConfig {
         return this;
       }
 
+      public Replication.Builder setAllocateMemoryForConsensus(long 
allocateMemoryForConsensus) {
+        this.allocateMemoryForConsensus = allocateMemoryForConsensus;
+        return this;
+      }
+
       public Replication build() {
         return new Replication(
-            maxPendingRequestNumPerNode,
             maxRequestPerBatch,
             maxPendingBatch,
             maxWaitingTimeForAccumulatingBatchInMs,
@@ -331,7 +330,8 @@ public class MultiLeaderConfig {
             maxRetryWaitTimeMs,
             walThrottleThreshold,
             throttleTimeOutMs,
-            checkpointGap);
+            checkpointGap,
+            allocateMemoryForConsensus);
       }
     }
   }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 2a8792db7a..2f35bdfe71 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -46,6 +46,7 @@ import 
org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClie
 import 
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
 import 
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory;
 import 
org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
+import 
org.apache.iotdb.consensus.multileader.logdispatcher.MultiLeaderMemoryManager;
 import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
 import 
org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -94,6 +95,9 @@ public class MultiLeaderConsensus implements IConsensus {
         new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>()
             .createClientManager(
                 new 
SyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
+    // init multiLeader memory manager
+    MultiLeaderMemoryManager.getInstance()
+        
.init(config.getMultiLeaderConfig().getReplication().getAllocateMemoryForConsensus());
   }
 
   @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 557fbf0f62..86f7186c6c 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -47,9 +47,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.OptionalLong;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -154,10 +154,9 @@ public class LogDispatcher {
                 "{}->{}: Push a log to the queue, where the queue length is 
{}",
                 impl.getThisNode().getGroupId(),
                 thread.getPeer().getEndpoint().getIp(),
-                thread.getPendingRequest().size());
-            if (!thread
-                .getPendingRequest()
-                .offer(new IndexedConsensusRequest(serializedRequests, 
request.getSearchIndex()))) {
+                thread.getPendingRequestSize());
+            if (!thread.offer(
+                new IndexedConsensusRequest(serializedRequests, 
request.getSearchIndex()))) {
               logger.debug(
                   "{}: Log queue of {} is full, ignore the log to this node, 
searchIndex: {}",
                   impl.getThisNode().getGroupId(),
@@ -183,15 +182,16 @@ public class LogDispatcher {
     // A reader management class that gets requests from the DataRegion
     private final ConsensusReqReader reader =
         (ConsensusReqReader) impl.getStateMachine().read(new 
GetConsensusReqReaderPlan());
+    private final MultiLeaderMemoryManager multiLeaderMemoryManager =
+        MultiLeaderMemoryManager.getInstance();
     private volatile boolean stopped = false;
 
-    private ConsensusReqReader.ReqIterator walEntryiterator;
+    private ConsensusReqReader.ReqIterator walEntryIterator;
 
     public LogDispatcherThread(Peer peer, MultiLeaderConfig config, long 
initialSyncIndex) {
       this.peer = peer;
       this.config = config;
-      this.pendingRequest =
-          new 
ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
+      this.pendingRequest = new LinkedBlockingQueue<>();
       this.controller =
           new IndexController(
               impl.getStorageDir(),
@@ -199,7 +199,7 @@ public class LogDispatcher {
               initialSyncIndex,
               config.getReplication().getCheckpointGap());
       this.syncStatus = new SyncStatus(controller, config);
-      this.walEntryiterator = reader.getReqIterator(START_INDEX);
+      this.walEntryIterator = reader.getReqIterator(START_INDEX);
     }
 
     public IndexController getController() {
@@ -218,12 +218,43 @@ public class LogDispatcher {
       return config;
     }
 
-    public BlockingQueue<IndexedConsensusRequest> getPendingRequest() {
-      return pendingRequest;
+    public int getPendingRequestSize() {
+      return pendingRequest.size();
+    }
+
+    /** try to offer a request into queue with memory control */
+    public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
+      if 
(!multiLeaderMemoryManager.reserve(indexedConsensusRequest.getSerializedSize()))
 {
+        return false;
+      }
+      boolean success;
+      try {
+        success = pendingRequest.offer(indexedConsensusRequest);
+      } catch (Throwable t) {
+        // If exception occurs during request offer, the reserved memory 
should be released
+        
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+        throw t;
+      }
+      if (!success) {
+        // If offer failed, the reserved memory should be released
+        
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+      }
+      return success;
+    }
+
+    /** try to remove a request from queue with memory control */
+    private void releaseReservedMemory(IndexedConsensusRequest 
indexedConsensusRequest) {
+      
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
     }
 
     public void stop() {
       stopped = true;
+      for (IndexedConsensusRequest indexedConsensusRequest : pendingRequest) {
+        
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+      }
+      for (IndexedConsensusRequest indexedConsensusRequest : bufferedRequest) {
+        
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+      }
     }
 
     public void cleanup() throws IOException {
@@ -289,7 +320,7 @@ public class LogDispatcher {
         logger.debug(
             "{} : pendingRequest Size: {}, bufferedRequest size: {}",
             impl.getThisNode().getGroupId(),
-            pendingRequest.size(),
+            getPendingRequestSize(),
             bufferedRequest.size());
         synchronized (impl.getIndexObject()) {
           pendingRequest.drainTo(
@@ -303,6 +334,7 @@ public class LogDispatcher {
           IndexedConsensusRequest request = iterator.next();
           if (request.getSearchIndex() < startIndex) {
             iterator.remove();
+            releaseReservedMemory(request);
           } else {
             break;
           }
@@ -333,6 +365,7 @@ public class LogDispatcher {
         constructBatchIndexedFromConsensusRequest(prev, logBatches);
         endIndex = prev.getSearchIndex();
         iterator.remove();
+        releaseReservedMemory(prev);
         while (iterator.hasNext()
             && logBatches.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
           IndexedConsensusRequest current = iterator.next();
@@ -357,6 +390,7 @@ public class LogDispatcher {
           // current function, but that's fine, we'll continue processing 
these elements in the
           // bufferedRequest the next time we go into the function, they're 
never lost
           iterator.remove();
+          releaseReservedMemory(current);
         }
         batch = new PendingBatch(startIndex, endIndex, logBatches);
         logger.debug(
@@ -395,17 +429,17 @@ public class LogDispatcher {
       // targetIndex is the index of request that we need to find
       long targetIndex = currentIndex;
       // Even if there is no WAL files, these code won't produce error.
-      walEntryiterator.skipTo(targetIndex);
+      walEntryIterator.skipTo(targetIndex);
       while (targetIndex < maxIndex
           && logBatches.size() < 
config.getReplication().getMaxRequestPerBatch()) {
         logger.debug("construct from WAL for one Entry, index : {}", 
targetIndex);
         try {
-          walEntryiterator.waitForNextReady();
+          walEntryIterator.waitForNextReady();
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           logger.warn("wait for next WAL entry is interrupted");
         }
-        IndexedConsensusRequest data = walEntryiterator.next();
+        IndexedConsensusRequest data = walEntryIterator.next();
         if (targetIndex > data.getSearchIndex()) {
           // if the index of request is smaller than currentIndex, then 
continue
           logger.warn(
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
new file mode 100644
index 0000000000..4abfde8fd2
--- /dev/null
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.logdispatcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MultiLeaderMemoryManager {
+  private static final Logger logger = 
LoggerFactory.getLogger(MultiLeaderMemoryManager.class);
+  private final AtomicLong memorySizeInByte = new AtomicLong(0);
+  private Long maxMemorySizeInByte = Runtime.getRuntime().maxMemory() / 10;
+
+  private MultiLeaderMemoryManager() {}
+
+  public boolean reserve(long size) {
+    synchronized (this) {
+      if (size > maxMemorySizeInByte - memorySizeInByte.get()) {
+        logger.debug(
+            "consensus memory limited. required: {}, used: {}, total: {}",
+            size,
+            memorySizeInByte.get(),
+            maxMemorySizeInByte);
+        return false;
+      }
+      memorySizeInByte.addAndGet(size);
+    }
+    logger.debug(
+        "{} add {} bytes, total memory size: {} bytes.",
+        Thread.currentThread().getName(),
+        size,
+        memorySizeInByte.get());
+    return true;
+  }
+
+  public void free(long size) {
+    long currentUsedMemory = memorySizeInByte.addAndGet(-size);
+    logger.debug(
+        "{} free {} bytes, total memory size: {} bytes.",
+        Thread.currentThread().getName(),
+        size,
+        currentUsedMemory);
+  }
+
+  public void init(long maxMemorySize) {
+    this.maxMemorySizeInByte = maxMemorySize;
+  }
+
+  private static final MultiLeaderMemoryManager INSTANCE = new 
MultiLeaderMemoryManager();
+
+  public static MultiLeaderMemoryManager getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties 
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 65b0865f15..368758b858 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -442,10 +442,10 @@ timestamp_precision=ms
 # Datatype: boolean
 # enable_mem_control=true
 
-# Memory Allocation Ratio: Write, Read, Schema and Free Memory.
-# The parameter form is a:b:c:d, where a, b, c and d are integers. for 
example: 1:1:1:1 , 6:2:1:1
-# If you have high level of writing pressure and low level of reading 
pressure, please adjust it to for example 6:1:1:2
-# write_read_schema_free_memory_proportion=4:3:1:2
+# Memory Allocation Ratio: Write, Read, Schema, Consensus and Free Memory.
+# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for 
example: 1:1:1:1:1 , 6:2:1:1:1
+# If you have high level of writing pressure and low level of reading 
pressure, please adjust it to for example 6:1:1:1:2
+# write_read_schema_free_memory_proportion=3:3:1:1:2
 
 # Schema Memory Allocation Ratio: SchemaRegion, SchemaCache, PartitionCache 
and LastCache.
 # The parameter form is a:b:c:d, where a, b, c and d are integers. for 
example: 1:1:1:1 , 6:2:1:1
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 76b53f6db8..2e5e90c4a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -127,7 +127,7 @@ public class IoTDBConfig {
   private int rpcMaxConcurrentClientNum = 65535;
 
   /** Memory allocated for the write process */
-  private long allocateMemoryForStorageEngine = 
Runtime.getRuntime().maxMemory() * 4 / 10;
+  private long allocateMemoryForStorageEngine = 
Runtime.getRuntime().maxMemory() * 3 / 10;
 
   /** Memory allocated for the read process */
   private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 
10;
@@ -135,6 +135,9 @@ public class IoTDBConfig {
   /** Memory allocated for the mtree */
   private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
 
+  /** Memory allocated for the consensus layer */
+  private long allocateMemoryForConsensus = Runtime.getRuntime().maxMemory() / 
10;
+
   private volatile int maxQueryDeduplicatedPathNum = 1000;
 
   /** Ratio of memory allocated for buffered arrays */
@@ -1813,10 +1816,18 @@ public class IoTDBConfig {
     return allocateMemoryForSchema;
   }
 
+  public long getAllocateMemoryForConsensus() {
+    return allocateMemoryForConsensus;
+  }
+
   public void setAllocateMemoryForSchema(long allocateMemoryForSchema) {
     this.allocateMemoryForSchema = allocateMemoryForSchema;
   }
 
+  public void setAllocateMemoryForConsensus(long allocateMemoryForConsensus) {
+    this.allocateMemoryForConsensus = allocateMemoryForConsensus;
+  }
+
   public long getAllocateMemoryForRead() {
     return allocateMemoryForRead;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 21889c5ae9..0e785e1b97 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.engine.compaction.constant.InnerUnsequenceCompactionS
 import org.apache.iotdb.db.exception.BadNodeUrlFormatException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.metrics.MetricService;
 import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
 import org.apache.iotdb.db.wal.WALManager;
@@ -1580,12 +1581,15 @@ public class IoTDBDescriptor {
             maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / 
proportionSum);
         conf.setAllocateMemoryForSchema(
             maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / 
proportionSum);
+        conf.setAllocateMemoryForConsensus(
+            maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / 
proportionSum);
       }
     }
 
-    logger.info("allocateMemoryForRead = {}", conf.getAllocateMemoryForRead());
-    logger.info("allocateMemoryForWrite = {}", 
conf.getAllocateMemoryForStorageEngine());
-    logger.info("allocateMemoryForSchema = {}", 
conf.getAllocateMemoryForSchema());
+    logger.info("initial allocateMemoryForRead = {}", 
conf.getAllocateMemoryForRead());
+    logger.info("initial allocateMemoryForWrite = {}", 
conf.getAllocateMemoryForStorageEngine());
+    logger.info("initial allocateMemoryForSchema = {}", 
conf.getAllocateMemoryForSchema());
+    logger.info("initial allocateMemoryForConsensus = {}", 
conf.getAllocateMemoryForConsensus());
 
     initSchemaMemoryAllocate(properties);
     initStorageEngineAllocate(properties);
@@ -1952,6 +1956,12 @@ public class IoTDBDescriptor {
         ratisConfig.getSchemaLeaderElectionTimeoutMax());
   }
 
+  public void reclaimConsensusMemory() {
+    conf.setAllocateMemoryForStorageEngine(
+        conf.getAllocateMemoryForStorageEngine() + 
conf.getAllocateMemoryForConsensus());
+    SystemInfo.getInstance().allocateWriteMemory();
+  }
+
   public void initClusterSchemaMemoryAllocate() {
     if (!conf.isDefaultSchemaMemoryConfig()) {
       // the config has already been updated as user config in properties file
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index df0e98bdfc..b611f442f8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -85,6 +85,8 @@ public class DataRegionConsensusImpl {
                               .setReplication(
                                   MultiLeaderConfig.Replication.newBuilder()
                                       
.setWalThrottleThreshold(conf.getThrottleThreshold())
+                                      .setAllocateMemoryForConsensus(
+                                          conf.getAllocateMemoryForConsensus())
                                       .build())
                               .build())
                       .setRatisConfig(
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 8c51e9eef5..293f9d635a 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.rescon;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -44,10 +45,8 @@ public class SystemInfo {
   private long totalStorageGroupMemCost = 0L;
   private volatile boolean rejected = false;
 
-  private static long memorySizeForWrite =
-      (long) (config.getAllocateMemoryForStorageEngine() * 
config.getWriteProportion());
-  private static long memorySizeForCompaction =
-      (long) (config.getAllocateMemoryForStorageEngine() * 
config.getCompactionProportion());
+  private long memorySizeForWrite;
+  private long memorySizeForCompaction;
 
   private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new 
HashMap<>();
 
@@ -56,11 +55,15 @@ public class SystemInfo {
 
   private ExecutorService flushTaskSubmitThreadPool =
       IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
-  private static double FLUSH_THERSHOLD = memorySizeForWrite * 
config.getFlushProportion();
-  private static double REJECT_THERSHOLD = memorySizeForWrite * 
config.getRejectProportion();
+  private double FLUSH_THERSHOLD = memorySizeForWrite * 
config.getFlushProportion();
+  private double REJECT_THERSHOLD = memorySizeForWrite * 
config.getRejectProportion();
 
   private volatile boolean isEncodingFasterThanIo = true;
 
+  private SystemInfo() {
+    allocateWriteMemory();
+  }
+
   /**
    * Report current mem cost of storage group to system. Called when the 
memory of storage group
    * newly accumulates to IoTDBConfig.getStorageGroupSizeReportThreshold()
@@ -193,6 +196,14 @@ public class SystemInfo {
     return memorySizeForCompaction;
   }
 
+  public void allocateWriteMemory() {
+    memorySizeForWrite =
+        (long) (config.getAllocateMemoryForStorageEngine() * 
config.getWriteProportion());
+    memorySizeForCompaction =
+        (long) (config.getAllocateMemoryForStorageEngine() * 
config.getCompactionProportion());
+  }
+
+  @TestOnly
   public void setMemorySizeForCompaction(long size) {
     memorySizeForCompaction = size;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 57c58d8c03..f2493ee567 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -213,6 +213,14 @@ public class DataNode implements DataNodeMBean {
             config.setSchemaRegionConsensusProtocolClass(
                 
dataNodeRegisterResp.globalConfig.getSchemaRegionConsensusProtocolClass());
           }
+
+          // In current implementation, only MultiLeader need separated memory 
from Consensus
+          if (!config
+              .getDataRegionConsensusProtocolClass()
+              .equals(ConsensusFactory.MultiLeaderConsensus)) {
+            IoTDBDescriptor.getInstance().reclaimConsensusMemory();
+          }
+
           
IoTDBStartCheck.getInstance().serializeGlobalConfig(dataNodeRegisterResp.globalConfig);
 
           logger.info("Register to the cluster successfully");
diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java 
b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
index 8a043bf6e3..b8fa07d27b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
@@ -83,7 +83,8 @@ public class NewIoTDB implements NewIoTDBMBean {
     }
     NewIoTDB daemon = NewIoTDB.getInstance();
     config.setMppMode(true);
-
+    // In standalone mode, Consensus memory should be reclaimed
+    IoTDBDescriptor.getInstance().reclaimConsensusMemory();
     loadExternLib(config);
 
     daemon.active();

Reply via email to