This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch NewMemControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f3c298a873516e8b1f6711e38bcd6d6dd59f933f Author: HTHou <[email protected]> AuthorDate: Sun Apr 25 16:04:13 2021 +0800 new mem control strategy --- .../org/apache/iotdb/db/engine/StorageEngine.java | 30 ++- .../iotdb/db/engine/memtable/AbstractMemTable.java | 11 ++ .../apache/iotdb/db/engine/memtable/IMemTable.java | 4 + .../db/engine/storagegroup/StorageGroupInfo.java | 6 +- .../engine/storagegroup/StorageGroupProcessor.java | 29 --- .../db/engine/storagegroup/TsFileProcessor.java | 36 ++-- .../engine/storagegroup/TsFileProcessorInfo.java | 5 +- .../db/engine/storagegroup/TsFileResource.java | 11 +- .../storagegroup/timeindex/DeviceTimeIndex.java | 18 -- .../storagegroup/timeindex/FileTimeIndex.java | 5 - .../engine/storagegroup/timeindex/ITimeIndex.java | 8 - .../org/apache/iotdb/db/rescon/SystemInfo.java | 208 ++++++++++----------- .../engine/storagegroup/TsFileProcessorTest.java | 4 - 13 files changed, 161 insertions(+), 214 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 6ae0edf..277fd1f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -61,7 +61,9 @@ import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.db.utils.UpgradeUtils; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.utils.Pair; @@ -72,6 +74,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -103,8 +106,8 @@ public class StorageEngine implements IService { @ServerConfigConsistent private static long timePartitionInterval = -1; /** whether enable data partition if disabled, all data belongs to partition 0 */ @ServerConfigConsistent - private static boolean enablePartition = - IoTDBDescriptor.getInstance().getConfig().isEnablePartition(); + private static boolean enablePartition = config.isEnablePartition(); + private final boolean enableMemControl = config.isEnableMemControl(); /** * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor @@ -504,6 +507,13 @@ public class StorageEngine implements IService { * @param insertRowPlan physical plan of insertion */ public void insert(InsertRowPlan insertRowPlan) throws StorageEngineException { + if (enableMemControl) { + try { + blockInsertionIfReject(); + } catch (WriteProcessException e) { + throw new StorageEngineException(e); + } + } StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowPlan.getDeviceId()); try { @@ -525,6 +535,13 @@ public class StorageEngine implements IService { public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws StorageEngineException { + if (enableMemControl) { + try { + blockInsertionIfReject(); + } catch (WriteProcessException e) { + throw new StorageEngineException(e); + } + } StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowsOfOneDevicePlan.getDeviceId()); @@ -539,6 +556,15 @@ public class StorageEngine implements IService { /** insert a InsertTabletPlan to a storage group */ public void insertTablet(InsertTabletPlan insertTabletPlan) throws StorageEngineException, BatchProcessException { + if (enableMemControl) { + try { + blockInsertionIfReject(); + } catch (WriteProcessRejectException e) { + TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()]; + Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT)); + throw new BatchProcessException(results); + } + } StorageGroupProcessor storageGroupProcessor; try { storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 0eb04c8..ee72aff 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -52,6 +52,7 @@ public abstract class AbstractMemTable implements IMemTable { */ protected boolean disableMemControl = true; + private boolean shouldFlush = false; private int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold(); /** memory size of data points, including TEXT values */ @@ -388,6 +389,16 @@ public abstract class AbstractMemTable implements IMemTable { } @Override + public void setShouldFlush() { + shouldFlush = true; + } + + @Override + public boolean shouldFlush() { + return shouldFlush; + } + + @Override public void release() { for (Entry<String, Map<String, IWritableMemChunk>> entry : memTableMap.entrySet()) { for (Entry<String, IWritableMemChunk> subEntry : entry.getValue().entrySet()) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index 8724e83..5f74b01 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -110,6 +110,10 @@ public interface IMemTable { boolean isSignalMemTable(); + void setShouldFlush(); + + boolean shouldFlush(); + void release(); /** must guarantee the device exists in the work memtable only used when mem control enabled */ diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java index c1f6713..c5ee561 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java @@ -62,9 +62,7 @@ public class StorageGroupInfo { /** When create a new TsFileProcessor, call this method */ public void initTsFileProcessorInfo(TsFileProcessor tsFileProcessor) { - if (reportedTsps.add(tsFileProcessor)) { - memoryCost.getAndAdd(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); - } + reportedTsps.add(tsFileProcessor); } public void addStorageGroupMemCost(long cost) { @@ -99,7 +97,7 @@ public class StorageGroupInfo { */ public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) { reportedTsps.remove(tsFileProcessor); - SystemInfo.getInstance().resetStorageGroupStatus(this, true); + SystemInfo.getInstance().resetStorageGroupStatus(this); } public Supplier<ByteBuffer[]> getWalSupplier() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 88979e2..69277b1 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -727,8 +727,6 @@ public class StorageGroupProcessor { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); - tsFileProcessorInfo.addTSPMemCost( - tsFileProcessor.getTsFileResource().calculateRamSize()); } workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); } else { @@ -745,8 +743,6 @@ public class StorageGroupProcessor { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); - tsFileProcessorInfo.addTSPMemCost( - tsFileProcessor.getTsFileResource().calculateRamSize()); } workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); } @@ -791,9 +787,6 @@ public class StorageGroupProcessor { if (!isAlive(insertRowPlan.getTime())) { throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL)); } - if (enableMemControl) { - StorageEngine.blockInsertionIfReject(); - } writeLock(); try { // init map @@ -835,15 +828,6 @@ public class StorageGroupProcessor { @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchProcessException, TriggerExecutionException { - if (enableMemControl) { - try { - StorageEngine.blockInsertionIfReject(); - } catch (WriteProcessRejectException e) { - TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()]; - Arrays.fill(results, RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT)); - throw new BatchProcessException(results); - } - } writeLock(); try { @@ -1126,18 +1110,6 @@ public class StorageGroupProcessor { } } - public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) { - writeLock(); - try { - if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) - && !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) { - fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); - } - } finally { - writeUnlock(); - } - } - private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) { TsFileProcessor tsFileProcessor = null; try { @@ -1252,7 +1224,6 @@ public class StorageGroupProcessor { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); - tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize()); } tsFileProcessor.addCloseFileListeners(customCloseFileListeners); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 0207971..8cc6b50 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -121,7 +121,6 @@ public class TsFileProcessor { private WriteLogNode logNode; private final boolean sequence; private long totalMemTableSize; - private boolean shouldFlush = false; private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get flushQueryLock write lock"; private static final String FLUSH_QUERY_WRITE_RELEASE = @@ -285,7 +284,6 @@ public class TsFileProcessor { long textDataIncrement = 0L; long chunkMetadataIncrement = 0L; String deviceId = insertRowPlan.getDeviceId().getFullPath(); - long unsealedResourceIncrement = tsFileResource.estimateRamIncrement(deviceId); int columnIndex = 0; for (int i = 0; i < insertRowPlan.getMeasurementMNodes().length; i++) { // skip failed Measurements @@ -326,7 +324,7 @@ public class TsFileProcessor { } } updateMemoryInfo( - memTableIncrement, unsealedResourceIncrement, chunkMetadataIncrement, textDataIncrement); + memTableIncrement, chunkMetadataIncrement, textDataIncrement); } private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, int start, int end) @@ -337,7 +335,6 @@ public class TsFileProcessor { long[] memIncrements = new long[3]; // memTable, text, chunk metadata String deviceId = insertTabletPlan.getDeviceId().getFullPath(); - long unsealedResourceIncrement = tsFileResource.estimateRamIncrement(deviceId); int columnIndex = 0; for (int i = 0; i < insertTabletPlan.getMeasurementMNodes().length; i++) { @@ -368,7 +365,7 @@ public class TsFileProcessor { long textDataIncrement = memIncrements[1]; long chunkMetadataIncrement = memIncrements[2]; updateMemoryInfo( - memTableIncrement, unsealedResourceIncrement, chunkMetadataIncrement, textDataIncrement); + memTableIncrement, chunkMetadataIncrement, textDataIncrement); } private void updateMemCost( @@ -453,21 +450,21 @@ public class TsFileProcessor { private void updateMemoryInfo( long memTableIncrement, - long unsealedResourceIncrement, long chunkMetadataIncrement, long textDataIncrement) throws WriteProcessException { memTableIncrement += textDataIncrement; storageGroupInfo.addStorageGroupMemCost(memTableIncrement); - tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement); + tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement); if (storageGroupInfo.needToReportToSystem()) { - SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo); try { - StorageEngine.blockInsertionIfReject(); + if (!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) { + StorageEngine.blockInsertionIfReject(); + } } catch (WriteProcessRejectException e) { storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); - tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement); - SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false); + tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); + SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); throw e; } } @@ -520,7 +517,7 @@ public class TsFileProcessor { if (workMemTable == null) { return false; } - if (shouldFlush) { + if (workMemTable.shouldFlush()) { logger.info( "The memtable size {} of tsfile {} reaches the mem control threshold", workMemTable.memSize(), @@ -766,6 +763,9 @@ public class TsFileProcessor { flushListener.onFlushStart(tobeFlushed); } + if (enableMemControl) { + SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost()); + } flushingMemTables.addLast(tobeFlushed); if (logger.isDebugEnabled()) { logger.debug( @@ -780,7 +780,6 @@ public class TsFileProcessor { totalMemTableSize += tobeFlushed.memSize(); } workMemTable = null; - shouldFlush = false; FlushManager.getInstance().registerTsFileProcessor(this); } @@ -821,7 +820,8 @@ public class TsFileProcessor { flushingMemTables.size()); } // report to System - SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true); + SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); + SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost()); } if (logger.isDebugEnabled()) { logger.debug( @@ -1265,12 +1265,8 @@ public class TsFileProcessor { return sequence; } - public void startAsyncFlush() { - storageGroupInfo.getStorageGroupProcessor().asyncFlushMemTableInTsFileProcessor(this); - } - - public void setFlush() { - shouldFlush = true; + public void setWorkMemTableShouldFlush() { + workMemTable.setShouldFlush(); } public void addFlushListener(FlushListener listener) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java index 94e8def..409227f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java @@ -18,7 +18,6 @@ */ package org.apache.iotdb.db.engine.storagegroup; -import org.apache.iotdb.db.conf.IoTDBDescriptor; /** The TsFileProcessorInfo records the memory cost of this TsFileProcessor. */ public class TsFileProcessorInfo { @@ -31,7 +30,7 @@ public class TsFileProcessorInfo { public TsFileProcessorInfo(StorageGroupInfo storageGroupInfo) { this.storageGroupInfo = storageGroupInfo; - this.memCost = IoTDBDescriptor.getInstance().getConfig().getWalBufferSize(); + this.memCost = 0L; } /** called in each insert */ @@ -49,6 +48,6 @@ public class TsFileProcessorInfo { /** called when closing TSP */ public void clear() { storageGroupInfo.releaseStorageGroupMemCost(memCost); - memCost = 0; + memCost = 0L; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index c6a7abc..0a36c53 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -772,20 +772,11 @@ public class TsFileResource { this.modFile = modFile; } - /** @return initial resource map size */ + /** @return resource map size */ public long calculateRamSize() { return timeIndex.calculateRamSize(); } - /** - * Calculate the resource ram increment when insert data in TsFileProcessor - * - * @return ramIncrement - */ - public long estimateRamIncrement(String deviceToBeChecked) { - return timeIndex.estimateRamIncrement(deviceToBeChecked); - } - public void delete() throws IOException { if (file.exists()) { Files.delete(file.toPath()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java index bade0d3..381d4b3 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java @@ -184,24 +184,6 @@ public class DeviceTimeIndex implements ITimeIndex { + RamUsageEstimator.sizeOf(endTimes); } - @Override - public long estimateRamIncrement(String deviceToBeChecked) { - long ramIncrement = 0L; - if (!deviceToIndex.containsKey(deviceToBeChecked)) { - // 80 is the Map.Entry header ram size - if (deviceToIndex.isEmpty()) { - ramIncrement += 80; - } - // Map.Entry ram size - ramIncrement += RamUsageEstimator.sizeOf(deviceToBeChecked) + 16; - // if needs to extend the startTimes and endTimes arrays - if (deviceToIndex.size() >= startTimes.length) { - ramIncrement += startTimes.length * Long.BYTES; - } - } - return ramIncrement; - } - private int getDeviceIndex(String deviceId) { int index; if (deviceToIndex.containsKey(deviceId)) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java index 4e3f601..e1e5251 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java @@ -135,11 +135,6 @@ public class FileTimeIndex implements ITimeIndex { } @Override - public long estimateRamIncrement(String deviceToBeChecked) { - return devices.contains(deviceToBeChecked) ? 0L : RamUsageEstimator.sizeOf(deviceToBeChecked); - } - - @Override public long getTimePartition(String tsFilePath) { try { if (devices != null && !devices.isEmpty()) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java index 7738c19..4c87408 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java @@ -77,14 +77,6 @@ public interface ITimeIndex { long calculateRamSize(); /** - * Calculate file index ram increment when insert data in TsFileProcessor - * - * @param deviceToBeChecked device to be checked - * @return ramIncrement - */ - long estimateRamIncrement(String deviceToBeChecked); - - /** * get time partition * * @param tsFilePath tsFile absolute path diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index 25ad6be..e9b1312 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -24,13 +24,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; - +import org.apache.iotdb.db.exception.WriteProcessRejectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.PriorityQueue; @@ -39,50 +37,63 @@ public class SystemInfo { private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private static final Logger logger = LoggerFactory.getLogger(SystemInfo.class); - private long totalSgMemCost = 0L; + private long totalStorageGroupMemCost = 0L; private volatile boolean rejected = false; private static long memorySizeForWrite = config.getAllocateMemoryForWrite(); - private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>(); + private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>(); + private long flushingMemTablesCost = 0L; private static double FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion(); private static double REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion(); - private boolean isEncodingFasterThanIo = true; + private volatile boolean isEncodingFasterThanIo = true; /** * Report current mem cost of storage group to system. Called when the memory of storage group * newly accumulates to IoTDBConfig.getStorageGroupSizeReportThreshold() * * @param storageGroupInfo storage group + * @throws WriteProcessRejectException */ - public synchronized void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) { + public synchronized boolean reportStorageGroupStatus(StorageGroupInfo storageGroupInfo, TsFileProcessor tsFileProcessor) throws WriteProcessRejectException { long delta = - storageGroupInfo.getMemCost() - reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L); - totalSgMemCost += delta; + storageGroupInfo.getMemCost() - reportedStorageGroupMemCostMap.getOrDefault(storageGroupInfo, 0L); + totalStorageGroupMemCost += delta; if (logger.isDebugEnabled()) { logger.debug( "Report Storage Group Status to the system. " + "After adding {}, current sg mem cost is {}.", delta, - totalSgMemCost); + totalStorageGroupMemCost); } - reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); + reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); - if (totalSgMemCost >= FLUSH_THERSHOLD) { + if (totalStorageGroupMemCost < FLUSH_THERSHOLD) { + return true; + } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD && totalStorageGroupMemCost < REJECT_THERSHOLD) { logger.debug( "The total storage group mem costs are too large, call for flushing. " + "Current sg cost is {}", - totalSgMemCost); - chooseTSPToMarkFlush(); - } - if (totalSgMemCost >= REJECT_THERSHOLD) { + totalStorageGroupMemCost); + chooseMemTablesToMarkFlush(tsFileProcessor); + return true; + } else { logger.info( "Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}).", storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(), delta, - totalSgMemCost); + totalStorageGroupMemCost); rejected = true; + if (chooseMemTablesToMarkFlush(tsFileProcessor)) { + if (totalStorageGroupMemCost < memorySizeForWrite) { + return true; + } else { + throw new WriteProcessRejectException("Total Storage Group MemCost "+ totalStorageGroupMemCost +" is over than memorySizeForWrite"); + } + } else { + return false; + } } } @@ -92,119 +103,94 @@ public class SystemInfo { * * @param storageGroupInfo storage group */ - public void resetStorageGroupStatus( - StorageGroupInfo storageGroupInfo, boolean shouldInvokeFlush) { - boolean needForceAsyncFlush = false; - synchronized (this) { - long delta = 0; - - if (reportedSgMemCostMap.containsKey(storageGroupInfo)) { - delta = reportedSgMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost(); - this.totalSgMemCost -= delta; - storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); - reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); - } + public synchronized void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo) { + long delta = 0; + + if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) { + delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost(); + this.totalStorageGroupMemCost -= delta; + storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); + reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); + } - if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < REJECT_THERSHOLD) { - logger.debug( - "SG ({}) released memory (delta: {}) but still exceeding flush proportion (totalSgMemCost: {}), call flush.", - storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(), - delta, - totalSgMemCost); - if (rejected) { - logger.info( - "SG ({}) released memory (delta: {}), set system to normal status (totalSgMemCost: {}).", - storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(), - delta, - totalSgMemCost); - } - logCurrentTotalSGMemory(); - rejected = false; - needForceAsyncFlush = true; - } else if (totalSgMemCost >= REJECT_THERSHOLD) { - logger.warn( - "SG ({}) released memory (delta: {}), but system is still in reject status (totalSgMemCost: {}).", - storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(), - delta, - totalSgMemCost); - logCurrentTotalSGMemory(); - rejected = true; - needForceAsyncFlush = true; - } else { - logger.debug( - "SG ({}) released memory (delta: {}), system is in normal status (totalSgMemCost: {}).", + if (totalStorageGroupMemCost >= FLUSH_THERSHOLD && totalStorageGroupMemCost < REJECT_THERSHOLD) { + logger.debug( + "SG ({}) released memory (delta: {}) but still exceeding flush proportion (totalSgMemCost: {}), call flush.", + storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(), + delta, + totalStorageGroupMemCost); + if (rejected) { + logger.info( + "SG ({}) released memory (delta: {}), set system to normal status (totalSgMemCost: {}).", storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(), delta, - totalSgMemCost); - logCurrentTotalSGMemory(); - rejected = false; + totalStorageGroupMemCost); } + logCurrentTotalSGMemory(); + rejected = false; + } else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) { + logger.warn( + "SG ({}) released memory (delta: {}), but system is still in reject status (totalSgMemCost: {}).", + storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(), + delta, + totalStorageGroupMemCost); + logCurrentTotalSGMemory(); + rejected = true; + } else { + logger.debug( + "SG ({}) released memory (delta: {}), system is in normal status (totalSgMemCost: {}).", + storageGroupInfo.getStorageGroupProcessor().getLogicalStorageGroupName(), + delta, + totalStorageGroupMemCost); + logCurrentTotalSGMemory(); + rejected = false; } - if (shouldInvokeFlush && needForceAsyncFlush) { - forceAsyncFlush(); - } + } + + public synchronized void addFlushingMemTableCost(long flushingMemTableCost) { + this.flushingMemTablesCost += flushingMemTableCost; + } + + public synchronized void resetFlushingMemTableCost(long flushingMemTableCost) { + this.flushingMemTablesCost -= flushingMemTableCost; } private void logCurrentTotalSGMemory() { - logger.debug("Current Sg cost is {}", totalSgMemCost); + logger.debug("Current Sg cost is {}", totalStorageGroupMemCost); } /** - * Order all tsfileProcessors in system by memory cost of actual data points in memtable. Mark the + * Order all working memtables in system by memory cost of actual data points in memtable. Mark the * top K TSPs as to be flushed, so that after flushing the K TSPs, the memory cost should be less * than FLUSH_THRESHOLD */ - private void chooseTSPToMarkFlush() { - if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) { - return; - } + private boolean chooseMemTablesToMarkFlush(TsFileProcessor currentTsFileProcessor) { // If invoke flush by replaying logs, do not flush now! - if (reportedSgMemCostMap.size() == 0) { - return; - } - // get the tsFile processors which has the max work MemTable size - List<TsFileProcessor> processors = getTsFileProcessorsToFlush(); - for (TsFileProcessor processor : processors) { - if (processor != null) { - processor.setFlush(); - } + if (reportedStorageGroupMemCostMap.size() == 0) { + return false; } - } - - /** Be Careful!! This method can only be called by flush thread! */ - private void forceAsyncFlush() { - if (FlushManager.getInstance().getNumberOfWorkingTasks() > 1) { - return; - } - List<TsFileProcessor> processors = getTsFileProcessorsToFlush(); - if (logger.isDebugEnabled()) { - logger.debug("[mem control] get {} tsp to flush", processors.size()); - } - for (TsFileProcessor processor : processors) { - if (processor != null) { - processor.startAsyncFlush(); - } - } - } - - private List<TsFileProcessor> getTsFileProcessorsToFlush() { - PriorityQueue<TsFileProcessor> tsps = + PriorityQueue<TsFileProcessor> allTsFileProcessors = new PriorityQueue<>( (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), o1.getWorkMemTableRamCost())); - for (StorageGroupInfo sgInfo : reportedSgMemCostMap.keySet()) { - tsps.addAll(sgInfo.getAllReportedTsp()); + for (StorageGroupInfo storageGroupInfo : reportedStorageGroupMemCostMap.keySet()) { + allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp()); } - List<TsFileProcessor> processors = new ArrayList<>(); + boolean isCurrentTsFileProcessorSelected = false; long memCost = 0; - while (totalSgMemCost - memCost > FLUSH_THERSHOLD / 2) { - if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) { - return processors; + long activeMemSize = totalStorageGroupMemCost - flushingMemTablesCost; + while (activeMemSize - memCost > FLUSH_THERSHOLD) { + if (allTsFileProcessors.isEmpty() || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) { + return false; + } + TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek(); + memCost += selectedTsFileProcessor.getWorkMemTableRamCost(); + selectedTsFileProcessor.setWorkMemTableShouldFlush(); + if (selectedTsFileProcessor == currentTsFileProcessor) { + isCurrentTsFileProcessorSelected = true; } - processors.add(tsps.peek()); - memCost += tsps.peek().getWorkMemTableRamCost(); - tsps.poll(); + allTsFileProcessors.poll(); } - return processors; + return isCurrentTsFileProcessorSelected; } public boolean isRejected() { @@ -220,8 +206,8 @@ public class SystemInfo { } public void close() { - reportedSgMemCostMap.clear(); - totalSgMemCost = 0; + reportedStorageGroupMemCostMap.clear(); + totalStorageGroupMemCost = 0; rejected = false; } @@ -249,7 +235,7 @@ public class SystemInfo { } public long getTotalMemTableSize() { - return totalSgMemCost; + return totalStorageGroupMemCost; } public double getFlushThershold() { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index 5f43331..b3d20c6 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -102,7 +102,6 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - tsFileProcessorInfo.addTSPMemCost(processor.getTsFileResource().calculateRamSize()); SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( @@ -179,7 +178,6 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - tsFileProcessorInfo.addTSPMemCost(processor.getTsFileResource().calculateRamSize()); SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( @@ -282,7 +280,6 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - tsFileProcessorInfo.addTSPMemCost(processor.getTsFileResource().calculateRamSize()); SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( @@ -339,7 +336,6 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - tsFileProcessorInfo.addTSPMemCost(processor.getTsFileResource().calculateRamSize()); SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
