This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch jd_iot_base_preview2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e22a439c669f6e5cce71d6d6997deebced41dc2a Author: ZhangHongYin <[email protected]> AuthorDate: Sat Oct 8 14:20:50 2022 +0800 [IOTDB-4522] Add memory control of queue in multiLeader consensus. (#7432) --- .../common/request/IndexedConsensusRequest.java | 8 +++ .../iotdb/consensus/config/MultiLeaderConfig.java | 32 +++++----- .../multileader/MultiLeaderConsensus.java | 4 ++ .../multileader/logdispatcher/LogDispatcher.java | 64 ++++++++++++++----- .../logdispatcher/MultiLeaderMemoryManager.java | 72 ++++++++++++++++++++++ .../resources/conf/iotdb-datanode.properties | 8 +-- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 16 ++++- .../db/consensus/DataRegionConsensusImpl.java | 2 + .../org/apache/iotdb/db/rescon/SystemInfo.java | 23 +++++-- .../java/org/apache/iotdb/db/service/DataNode.java | 8 +++ .../java/org/apache/iotdb/db/service/NewIoTDB.java | 3 +- 12 files changed, 207 insertions(+), 46 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java index 1a61cf3a09..6abca549b6 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java @@ -33,6 +33,7 @@ public class IndexedConsensusRequest implements IConsensusRequest { private final long syncIndex; private List<IConsensusRequest> requests; private List<ByteBuffer> serializedRequests; + private long serializedSize = 0; public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) { this.searchIndex = searchIndex; @@ -43,6 +44,9 @@ public class IndexedConsensusRequest implements IConsensusRequest { public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) { this.searchIndex = searchIndex; this.serializedRequests = serializedRequests; + for (ByteBuffer byteBuffer : serializedRequests) { + serializedSize += byteBuffer.capacity(); + } this.syncIndex = -1L; } @@ -72,6 +76,10 @@ public class IndexedConsensusRequest implements IConsensusRequest { return result; } + public long getSerializedSize() { + return serializedSize; + } + public long getSearchIndex() { return searchIndex; } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java index 2a334b0b84..cd0d58c4ae 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java @@ -196,7 +196,6 @@ public class MultiLeaderConfig { } public static class Replication { - private final int maxPendingRequestNumPerNode; private final int maxRequestPerBatch; private final int maxPendingBatch; private final int maxWaitingTimeForAccumulatingBatchInMs; @@ -205,9 +204,9 @@ public class MultiLeaderConfig { private final long walThrottleThreshold; private final long throttleTimeOutMs; private final long checkpointGap; + private final Long allocateMemoryForConsensus; private Replication( - int maxPendingRequestNumPerNode, int maxRequestPerBatch, int maxPendingBatch, int maxWaitingTimeForAccumulatingBatchInMs, @@ -215,8 +214,8 @@ public class MultiLeaderConfig { long maxRetryWaitTimeMs, long walThrottleThreshold, long throttleTimeOutMs, - long checkpointGap) { - this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode; + long checkpointGap, + long allocateMemoryForConsensus) { this.maxRequestPerBatch = maxRequestPerBatch; this.maxPendingBatch = maxPendingBatch; this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs; @@ -225,10 +224,7 @@ public class MultiLeaderConfig { this.walThrottleThreshold = walThrottleThreshold; this.throttleTimeOutMs = throttleTimeOutMs; this.checkpointGap = checkpointGap; - } - - public int getMaxPendingRequestNumPerNode() { - return maxPendingRequestNumPerNode; + this.allocateMemoryForConsensus = allocateMemoryForConsensus; } public int getMaxRequestPerBatch() { @@ -263,12 +259,15 @@ public class MultiLeaderConfig { return checkpointGap; } + public Long getAllocateMemoryForConsensus() { + return allocateMemoryForConsensus; + } + public static Replication.Builder newBuilder() { return new Replication.Builder(); } public static class Builder { - private int maxPendingRequestNumPerNode = 600; private int maxRequestPerBatch = 30; // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE // in DataRegionStateMachine @@ -279,11 +278,7 @@ public class MultiLeaderConfig { private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L; private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30); private long checkpointGap = 500; - - public Replication.Builder setMaxPendingRequestNumPerNode(int maxPendingRequestNumPerNode) { - this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode; - return this; - } + private long allocateMemoryForConsensus; public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch) { this.maxRequestPerBatch = maxRequestPerBatch; @@ -321,9 +316,13 @@ public class MultiLeaderConfig { return this; } + public Replication.Builder setAllocateMemoryForConsensus(long allocateMemoryForConsensus) { + this.allocateMemoryForConsensus = allocateMemoryForConsensus; + return this; + } + public Replication build() { return new Replication( - maxPendingRequestNumPerNode, maxRequestPerBatch, maxPendingBatch, maxWaitingTimeForAccumulatingBatchInMs, @@ -331,7 +330,8 @@ public class MultiLeaderConfig { maxRetryWaitTimeMs, walThrottleThreshold, throttleTimeOutMs, - checkpointGap); + checkpointGap, + allocateMemoryForConsensus); } } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java index 2a8792db7a..2f35bdfe71 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java @@ -46,6 +46,7 @@ import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClie import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory; import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.SyncMultiLeaderServiceClientPoolFactory; import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient; +import org.apache.iotdb.consensus.multileader.logdispatcher.MultiLeaderMemoryManager; import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService; import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor; import org.apache.iotdb.rpc.RpcUtils; @@ -94,6 +95,9 @@ public class MultiLeaderConsensus implements IConsensus { new IClientManager.Factory<TEndPoint, SyncMultiLeaderServiceClient>() .createClientManager( new SyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig())); + // init multiLeader memory manager + MultiLeaderMemoryManager.getInstance() + .init(config.getMultiLeaderConfig().getReplication().getAllocateMemoryForConsensus()); } @Override diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index 557fbf0f62..86f7186c6c 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -47,9 +47,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.OptionalLong; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -154,10 +154,9 @@ public class LogDispatcher { "{}->{}: Push a log to the queue, where the queue length is {}", impl.getThisNode().getGroupId(), thread.getPeer().getEndpoint().getIp(), - thread.getPendingRequest().size()); - if (!thread - .getPendingRequest() - .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) { + thread.getPendingRequestSize()); + if (!thread.offer( + new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) { logger.debug( "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}", impl.getThisNode().getGroupId(), @@ -183,15 +182,16 @@ public class LogDispatcher { // A reader management class that gets requests from the DataRegion private final ConsensusReqReader reader = (ConsensusReqReader) impl.getStateMachine().read(new GetConsensusReqReaderPlan()); + private final MultiLeaderMemoryManager multiLeaderMemoryManager = + MultiLeaderMemoryManager.getInstance(); private volatile boolean stopped = false; - private ConsensusReqReader.ReqIterator walEntryiterator; + private ConsensusReqReader.ReqIterator walEntryIterator; public LogDispatcherThread(Peer peer, MultiLeaderConfig config, long initialSyncIndex) { this.peer = peer; this.config = config; - this.pendingRequest = - new ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode()); + this.pendingRequest = new LinkedBlockingQueue<>(); this.controller = new IndexController( impl.getStorageDir(), @@ -199,7 +199,7 @@ public class LogDispatcher { initialSyncIndex, config.getReplication().getCheckpointGap()); this.syncStatus = new SyncStatus(controller, config); - this.walEntryiterator = reader.getReqIterator(START_INDEX); + this.walEntryIterator = reader.getReqIterator(START_INDEX); } public IndexController getController() { @@ -218,12 +218,43 @@ public class LogDispatcher { return config; } - public BlockingQueue<IndexedConsensusRequest> getPendingRequest() { - return pendingRequest; + public int getPendingRequestSize() { + return pendingRequest.size(); + } + + /** try to offer a request into queue with memory control */ + public boolean offer(IndexedConsensusRequest indexedConsensusRequest) { + if (!multiLeaderMemoryManager.reserve(indexedConsensusRequest.getSerializedSize())) { + return false; + } + boolean success; + try { + success = pendingRequest.offer(indexedConsensusRequest); + } catch (Throwable t) { + // If exception occurs during request offer, the reserved memory should be released + multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize()); + throw t; + } + if (!success) { + // If offer failed, the reserved memory should be released + multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize()); + } + return success; + } + + /** try to remove a request from queue with memory control */ + private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) { + multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize()); } public void stop() { stopped = true; + for (IndexedConsensusRequest indexedConsensusRequest : pendingRequest) { + multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize()); + } + for (IndexedConsensusRequest indexedConsensusRequest : bufferedRequest) { + multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize()); + } } public void cleanup() throws IOException { @@ -289,7 +320,7 @@ public class LogDispatcher { logger.debug( "{} : pendingRequest Size: {}, bufferedRequest size: {}", impl.getThisNode().getGroupId(), - pendingRequest.size(), + getPendingRequestSize(), bufferedRequest.size()); synchronized (impl.getIndexObject()) { pendingRequest.drainTo( @@ -303,6 +334,7 @@ public class LogDispatcher { IndexedConsensusRequest request = iterator.next(); if (request.getSearchIndex() < startIndex) { iterator.remove(); + releaseReservedMemory(request); } else { break; } @@ -333,6 +365,7 @@ public class LogDispatcher { constructBatchIndexedFromConsensusRequest(prev, logBatches); endIndex = prev.getSearchIndex(); iterator.remove(); + releaseReservedMemory(prev); while (iterator.hasNext() && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) { IndexedConsensusRequest current = iterator.next(); @@ -357,6 +390,7 @@ public class LogDispatcher { // current function, but that's fine, we'll continue processing these elements in the // bufferedRequest the next time we go into the function, they're never lost iterator.remove(); + releaseReservedMemory(current); } batch = new PendingBatch(startIndex, endIndex, logBatches); logger.debug( @@ -395,17 +429,17 @@ public class LogDispatcher { // targetIndex is the index of request that we need to find long targetIndex = currentIndex; // Even if there is no WAL files, these code won't produce error. - walEntryiterator.skipTo(targetIndex); + walEntryIterator.skipTo(targetIndex); while (targetIndex < maxIndex && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) { logger.debug("construct from WAL for one Entry, index : {}", targetIndex); try { - walEntryiterator.waitForNextReady(); + walEntryIterator.waitForNextReady(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("wait for next WAL entry is interrupted"); } - IndexedConsensusRequest data = walEntryiterator.next(); + IndexedConsensusRequest data = walEntryIterator.next(); if (targetIndex > data.getSearchIndex()) { // if the index of request is smaller than currentIndex, then continue logger.warn( diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java new file mode 100644 index 0000000000..4abfde8fd2 --- /dev/null +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.multileader.logdispatcher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicLong; + +public class MultiLeaderMemoryManager { + private static final Logger logger = LoggerFactory.getLogger(MultiLeaderMemoryManager.class); + private final AtomicLong memorySizeInByte = new AtomicLong(0); + private Long maxMemorySizeInByte = Runtime.getRuntime().maxMemory() / 10; + + private MultiLeaderMemoryManager() {} + + public boolean reserve(long size) { + synchronized (this) { + if (size > maxMemorySizeInByte - memorySizeInByte.get()) { + logger.debug( + "consensus memory limited. required: {}, used: {}, total: {}", + size, + memorySizeInByte.get(), + maxMemorySizeInByte); + return false; + } + memorySizeInByte.addAndGet(size); + } + logger.debug( + "{} add {} bytes, total memory size: {} bytes.", + Thread.currentThread().getName(), + size, + memorySizeInByte.get()); + return true; + } + + public void free(long size) { + long currentUsedMemory = memorySizeInByte.addAndGet(-size); + logger.debug( + "{} free {} bytes, total memory size: {} bytes.", + Thread.currentThread().getName(), + size, + currentUsedMemory); + } + + public void init(long maxMemorySize) { + this.maxMemorySizeInByte = maxMemorySize; + } + + private static final MultiLeaderMemoryManager INSTANCE = new MultiLeaderMemoryManager(); + + public static MultiLeaderMemoryManager getInstance() { + return INSTANCE; + } +} diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties index 38d0e3d1e3..d57ddd485a 100644 --- a/server/src/assembly/resources/conf/iotdb-datanode.properties +++ b/server/src/assembly/resources/conf/iotdb-datanode.properties @@ -447,10 +447,10 @@ timestamp_precision=ms # Datatype: boolean # enable_mem_control=true -# Memory Allocation Ratio: Write, Read, Schema and Free Memory. -# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 6:2:1:1 -# If you have high level of writing pressure and low level of reading pressure, please adjust it to for example 6:1:1:2 -# write_read_schema_free_memory_proportion=4:3:1:2 +# Memory Allocation Ratio: Write, Read, Schema, Consensus and Free Memory. +# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 6:2:1:1:1 +# If you have high level of writing pressure and low level of reading pressure, please adjust it to for example 6:1:1:1:2 +# write_read_schema_free_memory_proportion=3:3:1:1:2 # Schema Memory Allocation Ratio: SchemaRegion, SchemaCache, PartitionCache and LastCache. # The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 6:2:1:1 diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 8f7787c7c7..b92636c5f5 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -127,7 +127,7 @@ public class IoTDBConfig { private int rpcMaxConcurrentClientNum = 65535; /** Memory allocated for the write process */ - private long allocateMemoryForStorageEngine = Runtime.getRuntime().maxMemory() * 4 / 10; + private long allocateMemoryForStorageEngine = Runtime.getRuntime().maxMemory() * 3 / 10; /** Memory allocated for the read process */ private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10; @@ -135,6 +135,9 @@ public class IoTDBConfig { /** Memory allocated for the mtree */ private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() / 10; + /** Memory allocated for the consensus layer */ + private long allocateMemoryForConsensus = Runtime.getRuntime().maxMemory() / 10; + private volatile int maxQueryDeduplicatedPathNum = 1000; /** Ratio of memory allocated for buffered arrays */ @@ -1809,10 +1812,18 @@ public class IoTDBConfig { return allocateMemoryForSchema; } + public long getAllocateMemoryForConsensus() { + return allocateMemoryForConsensus; + } + public void setAllocateMemoryForSchema(long allocateMemoryForSchema) { this.allocateMemoryForSchema = allocateMemoryForSchema; } + public void setAllocateMemoryForConsensus(long allocateMemoryForConsensus) { + this.allocateMemoryForConsensus = allocateMemoryForConsensus; + } + public long getAllocateMemoryForRead() { return allocateMemoryForRead; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 57ed301e3b..ff8686ec9d 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.compaction.constant.InnerUnsequenceCompactionS import org.apache.iotdb.db.exception.BadNodeUrlFormatException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.utils.DatetimeUtils; +import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.db.service.metrics.MetricService; import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm; import org.apache.iotdb.db.wal.WALManager; @@ -1583,12 +1584,15 @@ public class IoTDBDescriptor { maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum); conf.setAllocateMemoryForSchema( maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum); + conf.setAllocateMemoryForConsensus( + maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum); } } - logger.info("allocateMemoryForRead = {}", conf.getAllocateMemoryForRead()); - logger.info("allocateMemoryForWrite = {}", conf.getAllocateMemoryForStorageEngine()); - logger.info("allocateMemoryForSchema = {}", conf.getAllocateMemoryForSchema()); + logger.info("initial allocateMemoryForRead = {}", conf.getAllocateMemoryForRead()); + logger.info("initial allocateMemoryForWrite = {}", conf.getAllocateMemoryForStorageEngine()); + logger.info("initial allocateMemoryForSchema = {}", conf.getAllocateMemoryForSchema()); + logger.info("initial allocateMemoryForConsensus = {}", conf.getAllocateMemoryForConsensus()); initSchemaMemoryAllocate(properties); initStorageEngineAllocate(properties); @@ -1935,6 +1939,12 @@ public class IoTDBDescriptor { conf.setRatisConsensusLeaderElectionTimeoutMaxMs(ratisConfig.getLeaderElectionTimeoutMax()); } + public void reclaimConsensusMemory() { + conf.setAllocateMemoryForStorageEngine( + conf.getAllocateMemoryForStorageEngine() + conf.getAllocateMemoryForConsensus()); + SystemInfo.getInstance().allocateWriteMemory(); + } + public void initClusterSchemaMemoryAllocate() { if (!conf.isDefaultSchemaMemoryConfig()) { // the config has already been updated as user config in properties file diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 0e493c2d4c..07d43e0ef2 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -85,6 +85,8 @@ public class DataRegionConsensusImpl { .setReplication( MultiLeaderConfig.Replication.newBuilder() .setWalThrottleThreshold(conf.getThrottleThreshold()) + .setAllocateMemoryForConsensus( + conf.getAllocateMemoryForConsensus()) .build()) .build()) .setRatisConfig( diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index 8c51e9eef5..293f9d635a 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.rescon; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.flush.FlushManager; @@ -44,10 +45,8 @@ public class SystemInfo { private long totalStorageGroupMemCost = 0L; private volatile boolean rejected = false; - private static long memorySizeForWrite = - (long) (config.getAllocateMemoryForStorageEngine() * config.getWriteProportion()); - private static long memorySizeForCompaction = - (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion()); + private long memorySizeForWrite; + private long memorySizeForCompaction; private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>(); @@ -56,11 +55,15 @@ public class SystemInfo { private ExecutorService flushTaskSubmitThreadPool = IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool"); - private static double FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion(); - private static double REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion(); + private double FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion(); + private double REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion(); private volatile boolean isEncodingFasterThanIo = true; + private SystemInfo() { + allocateWriteMemory(); + } + /** * Report current mem cost of storage group to system. Called when the memory of storage group * newly accumulates to IoTDBConfig.getStorageGroupSizeReportThreshold() @@ -193,6 +196,14 @@ public class SystemInfo { return memorySizeForCompaction; } + public void allocateWriteMemory() { + memorySizeForWrite = + (long) (config.getAllocateMemoryForStorageEngine() * config.getWriteProportion()); + memorySizeForCompaction = + (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion()); + } + + @TestOnly public void setMemorySizeForCompaction(long size) { memorySizeForCompaction = size; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index e25b510c6c..e21af2c5f4 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -212,6 +212,14 @@ public class DataNode implements DataNodeMBean { config.setSchemaRegionConsensusProtocolClass( dataNodeRegisterResp.globalConfig.getSchemaRegionConsensusProtocolClass()); } + + // In current implementation, only MultiLeader need separated memory from Consensus + if (!config + .getDataRegionConsensusProtocolClass() + .equals(ConsensusFactory.MultiLeaderConsensus)) { + IoTDBDescriptor.getInstance().reclaimConsensusMemory(); + } + IoTDBStartCheck.getInstance().serializeGlobalConfig(dataNodeRegisterResp.globalConfig); logger.info("Register to the cluster successfully"); diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java index 1aff419e56..1e8b8c21c1 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java @@ -83,7 +83,8 @@ public class NewIoTDB implements NewIoTDBMBean { } NewIoTDB daemon = NewIoTDB.getInstance(); config.setMppMode(true); - + // In standalone mode, Consensus memory should be reclaimed + IoTDBDescriptor.getInstance().reclaimConsensusMemory(); loadExternLib(config); daemon.active();
