This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 28face49b8 [IOTDB-4522] Add memory control of queue in multiLeader
consensus. (#7432)
28face49b8 is described below
commit 28face49b8788d949a73bce858bace2229b30bb3
Author: ZhangHongYin <[email protected]>
AuthorDate: Sat Oct 8 14:20:50 2022 +0800
[IOTDB-4522] Add memory control of queue in multiLeader consensus. (#7432)
---
.../common/request/IndexedConsensusRequest.java | 8 +++
.../iotdb/consensus/config/MultiLeaderConfig.java | 32 +++++-----
.../multileader/MultiLeaderConsensus.java | 4 ++
.../multileader/logdispatcher/LogDispatcher.java | 64 ++++++++++++++-----
.../logdispatcher/MultiLeaderMemoryManager.java | 72 ++++++++++++++++++++++
.../resources/conf/iotdb-datanode.properties | 8 +--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 16 ++++-
.../db/consensus/DataRegionConsensusImpl.java | 2 +
.../org/apache/iotdb/db/rescon/SystemInfo.java | 23 +++++--
.../java/org/apache/iotdb/db/service/DataNode.java | 8 +++
.../java/org/apache/iotdb/db/service/NewIoTDB.java | 3 +-
12 files changed, 207 insertions(+), 46 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 1a61cf3a09..6abca549b6 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -33,6 +33,7 @@ public class IndexedConsensusRequest implements
IConsensusRequest {
private final long syncIndex;
private List<IConsensusRequest> requests;
private List<ByteBuffer> serializedRequests;
+ private long serializedSize = 0;
public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest>
requests) {
this.searchIndex = searchIndex;
@@ -43,6 +44,9 @@ public class IndexedConsensusRequest implements
IConsensusRequest {
public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long
searchIndex) {
this.searchIndex = searchIndex;
this.serializedRequests = serializedRequests;
+ for (ByteBuffer byteBuffer : serializedRequests) {
+ serializedSize += byteBuffer.capacity();
+ }
this.syncIndex = -1L;
}
@@ -72,6 +76,10 @@ public class IndexedConsensusRequest implements
IConsensusRequest {
return result;
}
+ public long getSerializedSize() {
+ return serializedSize;
+ }
+
public long getSearchIndex() {
return searchIndex;
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 2a334b0b84..cd0d58c4ae 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -196,7 +196,6 @@ public class MultiLeaderConfig {
}
public static class Replication {
- private final int maxPendingRequestNumPerNode;
private final int maxRequestPerBatch;
private final int maxPendingBatch;
private final int maxWaitingTimeForAccumulatingBatchInMs;
@@ -205,9 +204,9 @@ public class MultiLeaderConfig {
private final long walThrottleThreshold;
private final long throttleTimeOutMs;
private final long checkpointGap;
+ private final Long allocateMemoryForConsensus;
private Replication(
- int maxPendingRequestNumPerNode,
int maxRequestPerBatch,
int maxPendingBatch,
int maxWaitingTimeForAccumulatingBatchInMs,
@@ -215,8 +214,8 @@ public class MultiLeaderConfig {
long maxRetryWaitTimeMs,
long walThrottleThreshold,
long throttleTimeOutMs,
- long checkpointGap) {
- this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
+ long checkpointGap,
+ long allocateMemoryForConsensus) {
this.maxRequestPerBatch = maxRequestPerBatch;
this.maxPendingBatch = maxPendingBatch;
this.maxWaitingTimeForAccumulatingBatchInMs =
maxWaitingTimeForAccumulatingBatchInMs;
@@ -225,10 +224,7 @@ public class MultiLeaderConfig {
this.walThrottleThreshold = walThrottleThreshold;
this.throttleTimeOutMs = throttleTimeOutMs;
this.checkpointGap = checkpointGap;
- }
-
- public int getMaxPendingRequestNumPerNode() {
- return maxPendingRequestNumPerNode;
+ this.allocateMemoryForConsensus = allocateMemoryForConsensus;
}
public int getMaxRequestPerBatch() {
@@ -263,12 +259,15 @@ public class MultiLeaderConfig {
return checkpointGap;
}
+ public Long getAllocateMemoryForConsensus() {
+ return allocateMemoryForConsensus;
+ }
+
public static Replication.Builder newBuilder() {
return new Replication.Builder();
}
public static class Builder {
- private int maxPendingRequestNumPerNode = 600;
private int maxRequestPerBatch = 30;
// (IMPORTANT) Value of this variable should be the same with
MAX_REQUEST_CACHE_SIZE
// in DataRegionStateMachine
@@ -279,11 +278,7 @@ public class MultiLeaderConfig {
private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
private long checkpointGap = 500;
-
- public Replication.Builder setMaxPendingRequestNumPerNode(int
maxPendingRequestNumPerNode) {
- this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
- return this;
- }
+ private long allocateMemoryForConsensus;
public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch)
{
this.maxRequestPerBatch = maxRequestPerBatch;
@@ -321,9 +316,13 @@ public class MultiLeaderConfig {
return this;
}
+ public Replication.Builder setAllocateMemoryForConsensus(long
allocateMemoryForConsensus) {
+ this.allocateMemoryForConsensus = allocateMemoryForConsensus;
+ return this;
+ }
+
public Replication build() {
return new Replication(
- maxPendingRequestNumPerNode,
maxRequestPerBatch,
maxPendingBatch,
maxWaitingTimeForAccumulatingBatchInMs,
@@ -331,7 +330,8 @@ public class MultiLeaderConfig {
maxRetryWaitTimeMs,
walThrottleThreshold,
throttleTimeOutMs,
- checkpointGap);
+ checkpointGap,
+ allocateMemoryForConsensus);
}
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 2a8792db7a..2f35bdfe71 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClie
import
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
import
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory;
import
org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
+import
org.apache.iotdb.consensus.multileader.logdispatcher.MultiLeaderMemoryManager;
import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
import
org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
import org.apache.iotdb.rpc.RpcUtils;
@@ -94,6 +95,9 @@ public class MultiLeaderConsensus implements IConsensus {
new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>()
.createClientManager(
new
SyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
+ // init multiLeader memory manager
+ MultiLeaderMemoryManager.getInstance()
+
.init(config.getMultiLeaderConfig().getReplication().getAllocateMemoryForConsensus());
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 557fbf0f62..86f7186c6c 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -47,9 +47,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -154,10 +154,9 @@ public class LogDispatcher {
"{}->{}: Push a log to the queue, where the queue length is
{}",
impl.getThisNode().getGroupId(),
thread.getPeer().getEndpoint().getIp(),
- thread.getPendingRequest().size());
- if (!thread
- .getPendingRequest()
- .offer(new IndexedConsensusRequest(serializedRequests,
request.getSearchIndex()))) {
+ thread.getPendingRequestSize());
+ if (!thread.offer(
+ new IndexedConsensusRequest(serializedRequests,
request.getSearchIndex()))) {
logger.debug(
"{}: Log queue of {} is full, ignore the log to this node,
searchIndex: {}",
impl.getThisNode().getGroupId(),
@@ -183,15 +182,16 @@ public class LogDispatcher {
// A reader management class that gets requests from the DataRegion
private final ConsensusReqReader reader =
(ConsensusReqReader) impl.getStateMachine().read(new
GetConsensusReqReaderPlan());
+ private final MultiLeaderMemoryManager multiLeaderMemoryManager =
+ MultiLeaderMemoryManager.getInstance();
private volatile boolean stopped = false;
- private ConsensusReqReader.ReqIterator walEntryiterator;
+ private ConsensusReqReader.ReqIterator walEntryIterator;
public LogDispatcherThread(Peer peer, MultiLeaderConfig config, long
initialSyncIndex) {
this.peer = peer;
this.config = config;
- this.pendingRequest =
- new
ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
+ this.pendingRequest = new LinkedBlockingQueue<>();
this.controller =
new IndexController(
impl.getStorageDir(),
@@ -199,7 +199,7 @@ public class LogDispatcher {
initialSyncIndex,
config.getReplication().getCheckpointGap());
this.syncStatus = new SyncStatus(controller, config);
- this.walEntryiterator = reader.getReqIterator(START_INDEX);
+ this.walEntryIterator = reader.getReqIterator(START_INDEX);
}
public IndexController getController() {
@@ -218,12 +218,43 @@ public class LogDispatcher {
return config;
}
- public BlockingQueue<IndexedConsensusRequest> getPendingRequest() {
- return pendingRequest;
+ public int getPendingRequestSize() {
+ return pendingRequest.size();
+ }
+
+ /** try to offer a request into queue with memory control */
+ public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
+ if
(!multiLeaderMemoryManager.reserve(indexedConsensusRequest.getSerializedSize()))
{
+ return false;
+ }
+ boolean success;
+ try {
+ success = pendingRequest.offer(indexedConsensusRequest);
+ } catch (Throwable t) {
+ // If exception occurs during request offer, the reserved memory
should be released
+
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+ throw t;
+ }
+ if (!success) {
+ // If offer failed, the reserved memory should be released
+
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+ }
+ return success;
+ }
+
+ /** try to remove a request from queue with memory control */
+ private void releaseReservedMemory(IndexedConsensusRequest
indexedConsensusRequest) {
+
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
}
public void stop() {
stopped = true;
+ for (IndexedConsensusRequest indexedConsensusRequest : pendingRequest) {
+
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+ }
+ for (IndexedConsensusRequest indexedConsensusRequest : bufferedRequest) {
+
multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+ }
}
public void cleanup() throws IOException {
@@ -289,7 +320,7 @@ public class LogDispatcher {
logger.debug(
"{} : pendingRequest Size: {}, bufferedRequest size: {}",
impl.getThisNode().getGroupId(),
- pendingRequest.size(),
+ getPendingRequestSize(),
bufferedRequest.size());
synchronized (impl.getIndexObject()) {
pendingRequest.drainTo(
@@ -303,6 +334,7 @@ public class LogDispatcher {
IndexedConsensusRequest request = iterator.next();
if (request.getSearchIndex() < startIndex) {
iterator.remove();
+ releaseReservedMemory(request);
} else {
break;
}
@@ -333,6 +365,7 @@ public class LogDispatcher {
constructBatchIndexedFromConsensusRequest(prev, logBatches);
endIndex = prev.getSearchIndex();
iterator.remove();
+ releaseReservedMemory(prev);
while (iterator.hasNext()
&& logBatches.size() <=
config.getReplication().getMaxRequestPerBatch()) {
IndexedConsensusRequest current = iterator.next();
@@ -357,6 +390,7 @@ public class LogDispatcher {
// current function, but that's fine, we'll continue processing
these elements in the
// bufferedRequest the next time we go into the function, they're
never lost
iterator.remove();
+ releaseReservedMemory(current);
}
batch = new PendingBatch(startIndex, endIndex, logBatches);
logger.debug(
@@ -395,17 +429,17 @@ public class LogDispatcher {
// targetIndex is the index of request that we need to find
long targetIndex = currentIndex;
// Even if there is no WAL files, these code won't produce error.
- walEntryiterator.skipTo(targetIndex);
+ walEntryIterator.skipTo(targetIndex);
while (targetIndex < maxIndex
&& logBatches.size() <
config.getReplication().getMaxRequestPerBatch()) {
logger.debug("construct from WAL for one Entry, index : {}",
targetIndex);
try {
- walEntryiterator.waitForNextReady();
+ walEntryIterator.waitForNextReady();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("wait for next WAL entry is interrupted");
}
- IndexedConsensusRequest data = walEntryiterator.next();
+ IndexedConsensusRequest data = walEntryIterator.next();
if (targetIndex > data.getSearchIndex()) {
// if the index of request is smaller than currentIndex, then
continue
logger.warn(
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
new file mode 100644
index 0000000000..4abfde8fd2
--- /dev/null
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.logdispatcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MultiLeaderMemoryManager {
+ private static final Logger logger =
LoggerFactory.getLogger(MultiLeaderMemoryManager.class);
+ private final AtomicLong memorySizeInByte = new AtomicLong(0);
+ private Long maxMemorySizeInByte = Runtime.getRuntime().maxMemory() / 10;
+
+ private MultiLeaderMemoryManager() {}
+
+ public boolean reserve(long size) {
+ synchronized (this) {
+ if (size > maxMemorySizeInByte - memorySizeInByte.get()) {
+ logger.debug(
+ "consensus memory limited. required: {}, used: {}, total: {}",
+ size,
+ memorySizeInByte.get(),
+ maxMemorySizeInByte);
+ return false;
+ }
+ memorySizeInByte.addAndGet(size);
+ }
+ logger.debug(
+ "{} add {} bytes, total memory size: {} bytes.",
+ Thread.currentThread().getName(),
+ size,
+ memorySizeInByte.get());
+ return true;
+ }
+
+ public void free(long size) {
+ long currentUsedMemory = memorySizeInByte.addAndGet(-size);
+ logger.debug(
+ "{} free {} bytes, total memory size: {} bytes.",
+ Thread.currentThread().getName(),
+ size,
+ currentUsedMemory);
+ }
+
+ public void init(long maxMemorySize) {
+ this.maxMemorySizeInByte = maxMemorySize;
+ }
+
+ private static final MultiLeaderMemoryManager INSTANCE = new
MultiLeaderMemoryManager();
+
+ public static MultiLeaderMemoryManager getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 65b0865f15..368758b858 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -442,10 +442,10 @@ timestamp_precision=ms
# Datatype: boolean
# enable_mem_control=true
-# Memory Allocation Ratio: Write, Read, Schema and Free Memory.
-# The parameter form is a:b:c:d, where a, b, c and d are integers. for
example: 1:1:1:1 , 6:2:1:1
-# If you have high level of writing pressure and low level of reading
pressure, please adjust it to for example 6:1:1:2
-# write_read_schema_free_memory_proportion=4:3:1:2
+# Memory Allocation Ratio: Write, Read, Schema, Consensus and Free Memory.
+# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for
example: 1:1:1:1:1 , 6:2:1:1:1
+# If you have high level of writing pressure and low level of reading
pressure, please adjust it to for example 6:1:1:1:2
+# write_read_schema_free_memory_proportion=3:3:1:1:2
# Schema Memory Allocation Ratio: SchemaRegion, SchemaCache, PartitionCache
and LastCache.
# The parameter form is a:b:c:d, where a, b, c and d are integers. for
example: 1:1:1:1 , 6:2:1:1
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 76b53f6db8..2e5e90c4a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -127,7 +127,7 @@ public class IoTDBConfig {
private int rpcMaxConcurrentClientNum = 65535;
/** Memory allocated for the write process */
- private long allocateMemoryForStorageEngine =
Runtime.getRuntime().maxMemory() * 4 / 10;
+ private long allocateMemoryForStorageEngine =
Runtime.getRuntime().maxMemory() * 3 / 10;
/** Memory allocated for the read process */
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 /
10;
@@ -135,6 +135,9 @@ public class IoTDBConfig {
/** Memory allocated for the mtree */
private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10;
+ /** Memory allocated for the consensus layer */
+ private long allocateMemoryForConsensus = Runtime.getRuntime().maxMemory() /
10;
+
private volatile int maxQueryDeduplicatedPathNum = 1000;
/** Ratio of memory allocated for buffered arrays */
@@ -1813,10 +1816,18 @@ public class IoTDBConfig {
return allocateMemoryForSchema;
}
+ public long getAllocateMemoryForConsensus() {
+ return allocateMemoryForConsensus;
+ }
+
public void setAllocateMemoryForSchema(long allocateMemoryForSchema) {
this.allocateMemoryForSchema = allocateMemoryForSchema;
}
+ public void setAllocateMemoryForConsensus(long allocateMemoryForConsensus) {
+ this.allocateMemoryForConsensus = allocateMemoryForConsensus;
+ }
+
public long getAllocateMemoryForRead() {
return allocateMemoryForRead;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 21889c5ae9..0e785e1b97 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.engine.compaction.constant.InnerUnsequenceCompactionS
import org.apache.iotdb.db.exception.BadNodeUrlFormatException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
import org.apache.iotdb.db.wal.WALManager;
@@ -1580,12 +1581,15 @@ public class IoTDBDescriptor {
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) /
proportionSum);
conf.setAllocateMemoryForSchema(
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) /
proportionSum);
+ conf.setAllocateMemoryForConsensus(
+ maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) /
proportionSum);
}
}
- logger.info("allocateMemoryForRead = {}", conf.getAllocateMemoryForRead());
- logger.info("allocateMemoryForWrite = {}",
conf.getAllocateMemoryForStorageEngine());
- logger.info("allocateMemoryForSchema = {}",
conf.getAllocateMemoryForSchema());
+ logger.info("initial allocateMemoryForRead = {}",
conf.getAllocateMemoryForRead());
+ logger.info("initial allocateMemoryForWrite = {}",
conf.getAllocateMemoryForStorageEngine());
+ logger.info("initial allocateMemoryForSchema = {}",
conf.getAllocateMemoryForSchema());
+ logger.info("initial allocateMemoryForConsensus = {}",
conf.getAllocateMemoryForConsensus());
initSchemaMemoryAllocate(properties);
initStorageEngineAllocate(properties);
@@ -1952,6 +1956,12 @@ public class IoTDBDescriptor {
ratisConfig.getSchemaLeaderElectionTimeoutMax());
}
+ public void reclaimConsensusMemory() {
+ conf.setAllocateMemoryForStorageEngine(
+ conf.getAllocateMemoryForStorageEngine() +
conf.getAllocateMemoryForConsensus());
+ SystemInfo.getInstance().allocateWriteMemory();
+ }
+
public void initClusterSchemaMemoryAllocate() {
if (!conf.isDefaultSchemaMemoryConfig()) {
// the config has already been updated as user config in properties file
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index df0e98bdfc..b611f442f8 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -85,6 +85,8 @@ public class DataRegionConsensusImpl {
.setReplication(
MultiLeaderConfig.Replication.newBuilder()
.setWalThrottleThreshold(conf.getThrottleThreshold())
+ .setAllocateMemoryForConsensus(
+ conf.getAllocateMemoryForConsensus())
.build())
.build())
.setRatisConfig(
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 8c51e9eef5..293f9d635a 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.rescon;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -44,10 +45,8 @@ public class SystemInfo {
private long totalStorageGroupMemCost = 0L;
private volatile boolean rejected = false;
- private static long memorySizeForWrite =
- (long) (config.getAllocateMemoryForStorageEngine() *
config.getWriteProportion());
- private static long memorySizeForCompaction =
- (long) (config.getAllocateMemoryForStorageEngine() *
config.getCompactionProportion());
+ private long memorySizeForWrite;
+ private long memorySizeForCompaction;
private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new
HashMap<>();
@@ -56,11 +55,15 @@ public class SystemInfo {
private ExecutorService flushTaskSubmitThreadPool =
IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
- private static double FLUSH_THERSHOLD = memorySizeForWrite *
config.getFlushProportion();
- private static double REJECT_THERSHOLD = memorySizeForWrite *
config.getRejectProportion();
+ private double FLUSH_THERSHOLD = memorySizeForWrite *
config.getFlushProportion();
+ private double REJECT_THERSHOLD = memorySizeForWrite *
config.getRejectProportion();
private volatile boolean isEncodingFasterThanIo = true;
+ private SystemInfo() {
+ allocateWriteMemory();
+ }
+
/**
* Report current mem cost of storage group to system. Called when the
memory of storage group
* newly accumulates to IoTDBConfig.getStorageGroupSizeReportThreshold()
@@ -193,6 +196,14 @@ public class SystemInfo {
return memorySizeForCompaction;
}
+ public void allocateWriteMemory() {
+ memorySizeForWrite =
+ (long) (config.getAllocateMemoryForStorageEngine() *
config.getWriteProportion());
+ memorySizeForCompaction =
+ (long) (config.getAllocateMemoryForStorageEngine() *
config.getCompactionProportion());
+ }
+
+ @TestOnly
public void setMemorySizeForCompaction(long size) {
memorySizeForCompaction = size;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 57c58d8c03..f2493ee567 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -213,6 +213,14 @@ public class DataNode implements DataNodeMBean {
config.setSchemaRegionConsensusProtocolClass(
dataNodeRegisterResp.globalConfig.getSchemaRegionConsensusProtocolClass());
}
+
+ // In current implementation, only MultiLeader need separated memory
from Consensus
+ if (!config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.MultiLeaderConsensus)) {
+ IoTDBDescriptor.getInstance().reclaimConsensusMemory();
+ }
+
IoTDBStartCheck.getInstance().serializeGlobalConfig(dataNodeRegisterResp.globalConfig);
logger.info("Register to the cluster successfully");
diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
index 8a043bf6e3..b8fa07d27b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
@@ -83,7 +83,8 @@ public class NewIoTDB implements NewIoTDBMBean {
}
NewIoTDB daemon = NewIoTDB.getInstance();
config.setMppMode(true);
-
+ // In standalone mode, Consensus memory should be reclaimed
+ IoTDBDescriptor.getInstance().reclaimConsensusMemory();
loadExternLib(config);
daemon.active();