This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch NewMemControl_11 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c07215490113808eab60d3580e9c7fbdb1c6db9d Author: HTHou <[email protected]> AuthorDate: Sun Apr 25 18:12:18 2021 +0800 [To rel/0.11] New mem control strategy --- .../org/apache/iotdb/db/engine/StorageEngine.java | 31 ++- .../iotdb/db/engine/memtable/AbstractMemTable.java | 12 ++ .../apache/iotdb/db/engine/memtable/IMemTable.java | 4 + .../db/engine/storagegroup/StorageGroupInfo.java | 2 +- .../engine/storagegroup/StorageGroupProcessor.java | 32 +-- .../db/engine/storagegroup/TsFileProcessor.java | 50 +++-- .../db/engine/storagegroup/TsFileResource.java | 22 -- .../org/apache/iotdb/db/rescon/SystemInfo.java | 224 ++++++++++++--------- .../engine/storagegroup/TsFileProcessorTest.java | 16 +- 9 files changed, 215 insertions(+), 178 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 5f08ea4..0b70bce 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -81,7 +82,9 @@ import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.db.utils.UpgradeUtils; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.utils.Pair; @@ -153,8 +156,9 @@ public class StorageEngine implements IService { * whether enable data partition if disabled, all data belongs to partition 0 */ @ServerConfigConsistent - private static boolean enablePartition = - IoTDBDescriptor.getInstance().getConfig().isEnablePartition(); + private static boolean enablePartition = config.isEnablePartition(); + + private final boolean enableMemControl = config.isEnableMemControl(); private StorageEngine() { logger = LoggerFactory.getLogger(StorageEngine.class); @@ -373,6 +377,13 @@ public class StorageEngine implements IService { */ public void insert(InsertRowPlan insertRowPlan) throws StorageEngineException { + if (enableMemControl) { + try { + blockInsertionIfReject(); + } catch (WriteProcessException e) { + throw new StorageEngineException(e); + } + } StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowPlan.getDeviceId()); // TODO monitor: update statistics @@ -385,6 +396,13 @@ public class StorageEngine implements IService { public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws StorageEngineException { + if (enableMemControl) { + try { + blockInsertionIfReject(); + } catch (WriteProcessException e) { + throw new StorageEngineException(e); + } + } StorageGroupProcessor storageGroupProcessor = getProcessor( insertRowsOfOneDevicePlan.getDeviceId()); @@ -406,6 +424,15 @@ public class StorageEngine implements IService { */ public void insertTablet(InsertTabletPlan insertTabletPlan) throws StorageEngineException, BatchInsertionException { + if (enableMemControl) { + try { + blockInsertionIfReject(); + } catch (WriteProcessRejectException e) { + TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()]; + Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT)); + throw new BatchInsertionException(results); + } + } StorageGroupProcessor storageGroupProcessor; try { storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 5960d43..bd7c2b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -71,6 +71,8 @@ public abstract class AbstractMemTable implements IMemTable { */ protected boolean disableMemControl = true; + private boolean shouldFlush = false; + private int seriesNumber = 0; private long totalPointsNum = 0; @@ -323,6 +325,16 @@ public abstract class AbstractMemTable implements IMemTable { } @Override + public void setShouldFlush() { + shouldFlush = true; + } + + @Override + public boolean shouldFlush() { + return shouldFlush; + } + + @Override public void release() { for (Entry<String, Map<String, IWritableMemChunk>> entry : memTableMap.entrySet()) { for (Entry<String, IWritableMemChunk> subEntry : entry.getValue().entrySet()) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index 81435c9..17f2f5b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -130,6 +130,10 @@ public interface IMemTable { void setVersion(long version); + void setShouldFlush(); + + boolean shouldFlush(); + void release(); /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java index a31d41a..25105b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java @@ -101,6 +101,6 @@ public class StorageGroupInfo { */ public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) { reportedTsps.remove(tsFileProcessor); - SystemInfo.getInstance().resetStorageGroupStatus(this, true); + SystemInfo.getInstance().resetStorageGroupStatus(this); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 9c9db9f..6ff6fb3 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -40,6 +40,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; @@ -147,6 +149,7 @@ public class StorageGroupProcessor { * partitionLatestFlushedTimeForEachDevice) */ private final ReadWriteLock insertLock = new ReentrantReadWriteLock(); + private final Condition writeLockConditionForReject = insertLock.writeLock().newCondition(); /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */ private final Object closeStorageGroupCondition = new Object(); /** @@ -636,8 +639,6 @@ public class StorageGroupProcessor { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); - tsFileProcessorInfo.addTSPMemCost( - tsFileProcessor.getTsFileResource().calculateRamSize()); } workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); } else { @@ -655,8 +656,6 @@ public class StorageGroupProcessor { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); - tsFileProcessorInfo.addTSPMemCost( - tsFileProcessor.getTsFileResource().calculateRamSize()); } workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); } @@ -700,9 +699,6 @@ public class StorageGroupProcessor { if (!isAlive(insertRowPlan.getTime())) { throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL)); } - if (enableMemControl) { - StorageEngine.blockInsertionIfReject(); - } writeLock(); try { // init map @@ -739,15 +735,6 @@ public class StorageGroupProcessor { */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException { - if (enableMemControl) { - try { - StorageEngine.blockInsertionIfReject(); - } catch (WriteProcessRejectException e) { - TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()]; - Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT)); - throw new BatchInsertionException(results); - } - } writeLock(); try { TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()]; @@ -1007,11 +994,11 @@ public class StorageGroupProcessor { } } - public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) { + public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) { writeLock(); try { - if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) - && !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) { + // check memtable size and may asyncTryToFlush the work memtable + if (tsFileProcessor.shouldFlush()) { fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); } } finally { @@ -1125,7 +1112,6 @@ public class StorageGroupProcessor { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); - tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize()); } } else { tsFileProcessor = @@ -1142,7 +1128,6 @@ public class StorageGroupProcessor { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); - tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize()); } } tsFileProcessor.addCloseFileListeners(customCloseFileListeners); @@ -1489,6 +1474,11 @@ public class StorageGroupProcessor { insertLock.writeLock().unlock(); } + public void writeLockConditionAwait() throws InterruptedException { + writeLockConditionForReject.await( + config.getCheckPeriodWhenInsertBlocked(), TimeUnit.MILLISECONDS); + } + /** * @param tsFileResources includes sealed and unsealed tsfile resources * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 20b924f..1e9af6e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -117,7 +117,6 @@ public class TsFileProcessor { private WriteLogNode logNode; private final boolean sequence; private long totalMemTableSize; - private boolean shouldFlush = false; private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get flushQueryLock write lock"; private static final String FLUSH_QUERY_WRITE_RELEASE = "{}: {} get flushQueryLock write lock released"; @@ -277,8 +276,6 @@ public class TsFileProcessor { long textDataIncrement = 0L; long chunkMetadataIncrement = 0L; String deviceId = insertRowPlan.getDeviceId().getFullPath(); - long unsealedResourceIncrement = - tsFileResource.estimateRamIncrement(deviceId); for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) { // skip failed Measurements if (insertRowPlan.getDataTypes()[i] == null) { @@ -302,8 +299,7 @@ public class TsFileProcessor { textDataIncrement += MemUtils.getBinarySize((Binary) insertRowPlan.getValues()[i]); } } - updateMemoryInfo(memTableIncrement, unsealedResourceIncrement, - chunkMetadataIncrement, textDataIncrement); + updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); } private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end) @@ -315,7 +311,6 @@ public class TsFileProcessor { long textDataIncrement = 0L; long chunkMetadataIncrement = 0L; String deviceId = insertTabletPlan.getDeviceId().getFullPath(); - long unsealedResourceIncrement = tsFileResource.estimateRamIncrement(deviceId); for (int i = 0; i < insertTabletPlan.getDataTypes().length; i++) { // skip failed Measurements @@ -352,24 +347,34 @@ public class TsFileProcessor { textDataIncrement += MemUtils.getBinaryColumnSize(column, start, end); } } - updateMemoryInfo(memTableIncrement, unsealedResourceIncrement, - chunkMetadataIncrement, textDataIncrement); + updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); } - private void updateMemoryInfo(long memTableIncrement, long unsealedResourceIncrement, + private void updateMemoryInfo(long memTableIncrement, long chunkMetadataIncrement, long textDataIncrement) throws WriteProcessException { memTableIncrement += textDataIncrement; storageGroupInfo.addStorageGroupMemCost(memTableIncrement); - tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement); + tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement); if (storageGroupInfo.needToReportToSystem()) { - SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo); try { - StorageEngine.blockInsertionIfReject(); + if (!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) { + long startTime = System.currentTimeMillis(); + while (SystemInfo.getInstance().isRejected()) { + storageGroupInfo.getStorageGroupProcessor().writeLockConditionAwait(); + if (System.currentTimeMillis() - startTime + > config.getMaxWaitingTimeWhenInsertBlocked()) { + throw new WriteProcessRejectException( + "System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() + "ms"); + } + } + } } catch (WriteProcessRejectException e) { storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); - tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement); - SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false); + tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); + SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } workMemTable.addTVListRamCost(memTableIncrement); @@ -420,7 +425,7 @@ public class TsFileProcessor { if (workMemTable == null) { return false; } - if (shouldFlush) { + if (workMemTable.shouldFlush()) { logger.info("The memtable size {} of tsfile {} reaches the mem control threshold", workMemTable.memSize(), tsFileResource.getTsFile().getAbsolutePath()); return true; @@ -636,6 +641,9 @@ public class TsFileProcessor { flushListener.onFlushStart(tobeFlushed); } + if (enableMemControl) { + SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost()); + } flushingMemTables.addLast(tobeFlushed); if (logger.isDebugEnabled()) { logger.debug( @@ -650,7 +658,6 @@ public class TsFileProcessor { totalMemTableSize += tobeFlushed.memSize(); } workMemTable = null; - shouldFlush = false; FlushManager.getInstance().registerTsFileProcessor(this); } @@ -687,7 +694,8 @@ public class TsFileProcessor { tsFileResource.getTsFile().getName(), flushingMemTables.size()); } // report to System - SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true); + SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); + SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost()); } if (logger.isDebugEnabled()) { logger.debug("{}: {} flush finished, remove a memtable from flushing list, " @@ -982,12 +990,12 @@ public class TsFileProcessor { return sequence; } - public void startAsyncFlush() { - storageGroupInfo.getStorageGroupProcessor().asyncFlushMemTableInTsFileProcessor(this); + public void submitAFlushTask() { + this.storageGroupInfo.getStorageGroupProcessor().submitAFlushTaskWhenShouldFlush(this); } - public void setFlush() { - shouldFlush = true; + public void setWorkMemTableShouldFlush() { + workMemTable.setShouldFlush(); } public void addFlushListener(FlushListener listener) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 2433259..6e79298 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -798,28 +798,6 @@ public class TsFileResource { + RamUsageEstimator.sizeOf(endTimes); } - /** - * Calculate the resource ram increment when insert data in TsFileProcessor - * - * @return ramIncrement - */ - public long estimateRamIncrement(String deviceToBeChecked) { - long ramIncrement = 0L; - if (!containsDevice(deviceToBeChecked)) { - // 80 is the Map.Entry header ram size - if (deviceToIndex.isEmpty()) { - ramIncrement += 80; - } - // Map.Entry ram size - ramIncrement += RamUsageEstimator.sizeOf(deviceToBeChecked) + 16; - // if needs to extend the startTimes and endTimes arrays - if (deviceToIndex.size() >= startTimes.length) { - ramIncrement += startTimes.length * Long.BYTES; - } - } - return ramIncrement; - } - public void delete() throws IOException { if (file.exists()) { Files.delete(file.toPath()); diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index dc314e1..66de58e 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -19,17 +19,17 @@ package org.apache.iotdb.db.rescon; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.PriorityQueue; - +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; +import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +38,17 @@ public class SystemInfo { private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class); - private long totalSgMemCost = 0L; + private long totalStorageGroupMemCost = 0L; private volatile boolean rejected = false; - private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>(); + private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>(); + + private long flushingMemTablesCost = 0L; + private AtomicInteger threadCnt = new AtomicInteger(); + private ExecutorService flushTaskSubmitThreadPool = + Executors.newFixedThreadPool( + config.getConcurrentFlushThread(), + r -> new Thread(r, "FlushTaskSubmitThread-" + threadCnt.getAndIncrement())); private static final double FLUSH_THERSHOLD = config.getAllocateMemoryForWrite() * config.getFlushProportion(); @@ -55,24 +62,41 @@ public class SystemInfo { * * @param storageGroupInfo storage group */ - public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) { - long delta = storageGroupInfo.getMemCost() - - reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L); - totalSgMemCost += delta; + public synchronized boolean reportStorageGroupStatus( + StorageGroupInfo storageGroupInfo, TsFileProcessor tsFileProcessor) + throws WriteProcessRejectException { + long delta = storageGroupInfo.getMemCost() + - reportedStorageGroupMemCostMap.getOrDefault(storageGroupInfo, 0L); + totalStorageGroupMemCost += delta; if (logger.isDebugEnabled()) { logger.debug("Report Storage Group Status to the system. " - + "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost); + + "After adding {}, current sg mem cost is {}.", delta, totalStorageGroupMemCost); } - reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); + reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); - if (totalSgMemCost >= FLUSH_THERSHOLD) { + if (totalStorageGroupMemCost < FLUSH_THERSHOLD) { + return true; + } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD + && totalStorageGroupMemCost < REJECT_THERSHOLD) { logger.debug("The total storage group mem costs are too large, call for flushing. " - + "Current sg cost is {}", totalSgMemCost); - chooseTSPToMarkFlush(); - } - if (totalSgMemCost >= REJECT_THERSHOLD) { - logger.info("Change system to reject status..."); + + "Current sg cost is {}", totalStorageGroupMemCost); + chooseMemTablesToMarkFlush(tsFileProcessor); + return true; + } else { rejected = true; + if (chooseMemTablesToMarkFlush(tsFileProcessor)) { + if (totalStorageGroupMemCost < config.getAllocateMemoryForWrite()) { + return true; + } else { + throw new WriteProcessRejectException( + "Total Storage Group MemCost " + + totalStorageGroupMemCost + + " is over than memorySizeForWriting " + + config.getAllocateMemoryForWrite()); + } + } else { + return false; + } } } @@ -82,44 +106,67 @@ public class SystemInfo { * * @param storageGroupInfo storage group */ - public void resetStorageGroupStatus( - StorageGroupInfo storageGroupInfo, boolean shouldInvokeFlush) { - boolean needForceAsyncFlush = false; - synchronized (this) { - if (reportedSgMemCostMap.containsKey(storageGroupInfo)) { - this.totalSgMemCost -= - (reportedSgMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost()); - storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); - reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); - } - - if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < REJECT_THERSHOLD) { - logger.debug("Some sg memory released but still exceeding flush proportion, call flush."); - if (rejected) { - logger.info("Some sg memory released, set system to normal status."); - } - logCurrentTotalSGMemory(); - rejected = false; - needForceAsyncFlush = true; - } else if (totalSgMemCost >= REJECT_THERSHOLD) { - logger.warn("Some sg memory released, but system is still in reject status."); - logCurrentTotalSGMemory(); - rejected = true; - needForceAsyncFlush = true; + /** + * Report resetting the mem cost of sg to system. It will be called after flushing, closing and + * failed to insert + * + * @param storageGroupInfo storage group + */ + public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) { + long delta = 0; + + if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) { + delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost(); + this.totalStorageGroupMemCost -= delta; + storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); + reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); + } - } else { - logger.debug("Some sg memory released, system is in normal status."); - logCurrentTotalSGMemory(); - rejected = false; + if (totalStorageGroupMemCost >= FLUSH_THERSHOLD + && totalStorageGroupMemCost < REJECT_THERSHOLD) { + logger.debug( + "SG ({}) released memory (delta: {}) but still exceeding flush proportion (totalSgMemCost: {}), call flush.", + storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(), + delta, + totalStorageGroupMemCost); + if (rejected) { + logger.info( + "SG ({}) released memory (delta: {}), set system to normal status (totalSgMemCost: {}).", + storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(), + delta, + totalStorageGroupMemCost); } - } - if (shouldInvokeFlush && needForceAsyncFlush) { - forceAsyncFlush(); + logCurrentTotalSGMemory(); + rejected = false; + } else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) { + logger.warn( + "SG ({}) released memory (delta: {}), but system is still in reject status (totalSgMemCost: {}).", + storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(), + delta, + totalStorageGroupMemCost); + logCurrentTotalSGMemory(); + rejected = true; + } else { + logger.debug( + "SG ({}) released memory (delta: {}), system is in normal status (totalSgMemCost: {}).", + storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(), + delta, + totalStorageGroupMemCost); + logCurrentTotalSGMemory(); + rejected = false; } } private void logCurrentTotalSGMemory() { - logger.debug("Current Sg cost is {}", totalSgMemCost); + logger.debug("Current Sg cost is {}", totalStorageGroupMemCost); + } + + public synchronized void addFlushingMemTableCost(long flushingMemTableCost) { + this.flushingMemTablesCost += flushingMemTableCost; + } + + public synchronized void resetFlushingMemTableCost(long flushingMemTableCost) { + this.flushingMemTablesCost -= flushingMemTableCost; } /** @@ -127,58 +174,37 @@ public class SystemInfo { * Mark the top K TSPs as to be flushed, * so that after flushing the K TSPs, the memory cost should be less than FLUSH_THRESHOLD */ - private void chooseTSPToMarkFlush() { - if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) { - return; - } - // If invoke flush by replaying logs, do not flush now! - if (reportedSgMemCostMap.size() == 0) { - return; + private boolean chooseMemTablesToMarkFlush(TsFileProcessor currentTsFileProcessor) { + if (reportedStorageGroupMemCostMap.size() == 0) { + return false; } - // get the tsFile processors which has the max work MemTable size - List<TsFileProcessor> processors = getTsFileProcessorsToFlush(); - for (TsFileProcessor processor : processors) { - if (processor != null) { - processor.setFlush(); - } + PriorityQueue<TsFileProcessor> allTsFileProcessors = + new PriorityQueue<>( + (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), o1.getWorkMemTableRamCost())); + for (StorageGroupInfo storageGroupInfo : reportedStorageGroupMemCostMap.keySet()) { + allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp()); } - } - - /** - * Be Careful!! This method can only be called by flush thread! - */ - private void forceAsyncFlush() { - if (FlushManager.getInstance().getNumberOfWorkingTasks() > 1) { - return; - } - List<TsFileProcessor> processors = getTsFileProcessorsToFlush(); - if (logger.isDebugEnabled()) { - logger.debug("[mem control] get {} tsp to flush", processors.size()); - } - for (TsFileProcessor processor : processors) { - if (processor != null) { - processor.startAsyncFlush(); - } - } - } - - private List<TsFileProcessor> getTsFileProcessorsToFlush() { - PriorityQueue<TsFileProcessor> tsps = new PriorityQueue<>( - (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), o1.getWorkMemTableRamCost())); - for (StorageGroupInfo sgInfo : reportedSgMemCostMap.keySet()) { - tsps.addAll(sgInfo.getAllReportedTsp()); - } - List<TsFileProcessor> processors = new ArrayList<>(); + boolean isCurrentTsFileProcessorSelected = false; long memCost = 0; - while (totalSgMemCost - memCost > FLUSH_THERSHOLD / 2) { - if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) { - return processors; + long activeMemSize = totalStorageGroupMemCost - flushingMemTablesCost; + while (activeMemSize - memCost > FLUSH_THERSHOLD) { + if (allTsFileProcessors.isEmpty() + || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) { + return false; + } + TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek(); + memCost += selectedTsFileProcessor.getWorkMemTableRamCost(); + selectedTsFileProcessor.setWorkMemTableShouldFlush(); + if (selectedTsFileProcessor == currentTsFileProcessor) { + isCurrentTsFileProcessorSelected = true; } - processors.add(tsps.peek()); - memCost += tsps.peek().getWorkMemTableRamCost(); - tsps.poll(); + flushTaskSubmitThreadPool.submit( + () -> { + selectedTsFileProcessor.submitAFlushTask(); + }); + allTsFileProcessors.poll(); } - return processors; + return isCurrentTsFileProcessorSelected; } public boolean isRejected() { @@ -186,8 +212,8 @@ public class SystemInfo { } public void close() { - reportedSgMemCostMap.clear(); - totalSgMemCost = 0; + reportedStorageGroupMemCostMap.clear(); + totalStorageGroupMemCost = 0; rejected = false; } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index 35d9876..4eac99e 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -92,9 +92,7 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - tsFileProcessorInfo.addTSPMemCost(processor - .getTsFileResource().calculateRamSize()); - SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query(deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); @@ -148,9 +146,7 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - tsFileProcessorInfo.addTSPMemCost(processor - .getTsFileResource().calculateRamSize()); - SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query(deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); @@ -230,9 +226,7 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - tsFileProcessorInfo.addTSPMemCost(processor - .getTsFileResource().calculateRamSize()); - SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query(deviceId, measurementId, dataType, encoding, props, context, tsfileResourcesForQuery); @@ -271,9 +265,7 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - tsFileProcessorInfo.addTSPMemCost(processor - .getTsFileResource().calculateRamSize()); - SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query(deviceId, measurementId, dataType, encoding, props, context,
