This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 0b7cd50 [IOTDB-1306][To rel/0.11] New memory control strategy (#3061)
0b7cd50 is described below
commit 0b7cd500a18df5f3b0b0ba1264c79bb316fec994
Author: Haonan <[email protected]>
AuthorDate: Thu Apr 29 13:53:11 2021 +0800
[IOTDB-1306][To rel/0.11] New memory control strategy (#3061)
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 38 +++-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 12 ++
.../apache/iotdb/db/engine/memtable/IMemTable.java | 4 +
.../db/engine/storagegroup/StorageGroupInfo.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 31 +--
.../db/engine/storagegroup/TsFileProcessor.java | 44 ++--
.../db/engine/storagegroup/TsFileResource.java | 22 --
.../org/apache/iotdb/db/rescon/SystemInfo.java | 223 ++++++++++++---------
.../engine/storagegroup/TsFileProcessorTest.java | 16 +-
9 files changed, 206 insertions(+), 186 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 5f08ea4..f7d0f63 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@@ -81,7 +82,9 @@ import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -153,8 +156,9 @@ public class StorageEngine implements IService {
* whether enable data partition if disabled, all data belongs to partition 0
*/
@ServerConfigConsistent
- private static boolean enablePartition =
- IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
+ private static boolean enablePartition = config.isEnablePartition();
+
+ private final boolean enableMemControl = config.isEnableMemControl();
private StorageEngine() {
logger = LoggerFactory.getLogger(StorageEngine.class);
@@ -373,6 +377,13 @@ public class StorageEngine implements IService {
*/
public void insert(InsertRowPlan insertRowPlan) throws
StorageEngineException {
+ if (enableMemControl) {
+ try {
+ blockInsertionIfReject(null);
+ } catch (WriteProcessException e) {
+ throw new StorageEngineException(e);
+ }
+ }
StorageGroupProcessor storageGroupProcessor =
getProcessor(insertRowPlan.getDeviceId());
// TODO monitor: update statistics
@@ -385,6 +396,13 @@ public class StorageEngine implements IService {
public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
throws StorageEngineException {
+ if (enableMemControl) {
+ try {
+ blockInsertionIfReject(null);
+ } catch (WriteProcessException e) {
+ throw new StorageEngineException(e);
+ }
+ }
StorageGroupProcessor storageGroupProcessor = getProcessor(
insertRowsOfOneDevicePlan.getDeviceId());
@@ -406,6 +424,15 @@ public class StorageEngine implements IService {
*/
public void insertTablet(InsertTabletPlan insertTabletPlan)
throws StorageEngineException, BatchInsertionException {
+ if (enableMemControl) {
+ try {
+ blockInsertionIfReject(null);
+ } catch (WriteProcessRejectException e) {
+ TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+ Arrays.fill(results,
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT));
+ throw new BatchInsertionException(results);
+ }
+ }
StorageGroupProcessor storageGroupProcessor;
try {
storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId());
@@ -849,14 +876,17 @@ public class StorageEngine implements IService {
/**
* block insertion if the insertion is rejected by memory control
*/
- public static void blockInsertionIfReject() throws
WriteProcessRejectException {
+ public static void blockInsertionIfReject(TsFileProcessor tsfileProcessor)
throws WriteProcessRejectException {
long startTime = System.currentTimeMillis();
while (SystemInfo.getInstance().isRejected()) {
+ if (tsfileProcessor != null && tsfileProcessor.shouldFlush()) {
+ break;
+ }
try {
TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
if (System.currentTimeMillis() - startTime >
config.getMaxWaitingTimeWhenInsertBlocked()) {
throw new WriteProcessRejectException(
- "System rejected over " +
config.getMaxWaitingTimeWhenInsertBlocked() +
+ "System rejected over " + (System.currentTimeMillis() -
startTime) +
"ms");
}
} catch (InterruptedException e) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5960d43..5e7daaa 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -71,6 +71,8 @@ public abstract class AbstractMemTable implements IMemTable {
*/
protected boolean disableMemControl = true;
+ private volatile boolean shouldFlush = false;
+
private int seriesNumber = 0;
private long totalPointsNum = 0;
@@ -323,6 +325,16 @@ public abstract class AbstractMemTable implements
IMemTable {
}
@Override
+ public void setShouldFlush() {
+ shouldFlush = true;
+ }
+
+ @Override
+ public boolean shouldFlush() {
+ return shouldFlush;
+ }
+
+ @Override
public void release() {
for (Entry<String, Map<String, IWritableMemChunk>> entry :
memTableMap.entrySet()) {
for (Entry<String, IWritableMemChunk> subEntry :
entry.getValue().entrySet()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 81435c9..17f2f5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -130,6 +130,10 @@ public interface IMemTable {
void setVersion(long version);
+ void setShouldFlush();
+
+ boolean shouldFlush();
+
void release();
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index a31d41a..25105b9 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -101,6 +101,6 @@ public class StorageGroupInfo {
*/
public void closeTsFileProcessorAndReportToSystem(TsFileProcessor
tsFileProcessor) {
reportedTsps.remove(tsFileProcessor);
- SystemInfo.getInstance().resetStorageGroupStatus(this, true);
+ SystemInfo.getInstance().resetStorageGroupStatus(this);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 9c9db9f..4fd2d0f 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -636,8 +636,6 @@ public class StorageGroupProcessor {
TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
- tsFileProcessorInfo.addTSPMemCost(
- tsFileProcessor.getTsFileResource().calculateRamSize());
}
workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
} else {
@@ -655,8 +653,6 @@ public class StorageGroupProcessor {
TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
- tsFileProcessorInfo.addTSPMemCost(
- tsFileProcessor.getTsFileResource().calculateRamSize());
}
workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
}
@@ -700,9 +696,6 @@ public class StorageGroupProcessor {
if (!isAlive(insertRowPlan.getTime())) {
throw new OutOfTTLException(insertRowPlan.getTime(),
(System.currentTimeMillis() - dataTTL));
}
- if (enableMemControl) {
- StorageEngine.blockInsertionIfReject();
- }
writeLock();
try {
// init map
@@ -739,15 +732,6 @@ public class StorageGroupProcessor {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
public void insertTablet(InsertTabletPlan insertTabletPlan) throws
BatchInsertionException {
- if (enableMemControl) {
- try {
- StorageEngine.blockInsertionIfReject();
- } catch (WriteProcessRejectException e) {
- TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
- Arrays.fill(results,
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT));
- throw new BatchInsertionException(results);
- }
- }
writeLock();
try {
TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
@@ -1007,11 +991,11 @@ public class StorageGroupProcessor {
}
}
- public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor
tsFileProcessor) {
+ public void submitAFlushTaskIfShouldFlush(TsFileProcessor tsFileProcessor) {
writeLock();
try {
- if (!closingSequenceTsFileProcessor.contains(tsFileProcessor)
- && !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ // check memtable size and may asyncTryToFlush the work memtable
+ if (tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
}
} finally {
@@ -1125,7 +1109,6 @@ public class StorageGroupProcessor {
TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-
tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize());
}
} else {
tsFileProcessor =
@@ -1142,7 +1125,6 @@ public class StorageGroupProcessor {
TsFileProcessorInfo tsFileProcessorInfo = new
TsFileProcessorInfo(storageGroupInfo);
tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-
tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize());
}
}
tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
@@ -1199,7 +1181,8 @@ public class StorageGroupProcessor {
// for sequence tsfile, we update the endTimeMap only when the file is
prepared to be closed.
// for unsequence tsfile, we have maintained the endTimeMap when an
insertion comes.
if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
- || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
+ || tsFileProcessor.alreadyMarkedClosing()) {
return;
}
logger.info(
@@ -2690,10 +2673,6 @@ public class StorageGroupProcessor {
public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
throws WriteProcessException {
-
- if (enableMemControl) {
- StorageEngine.blockInsertionIfReject();
- }
writeLock();
try {
boolean isSequence = false;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 20b924f..422d78d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -117,7 +117,6 @@ public class TsFileProcessor {
private WriteLogNode logNode;
private final boolean sequence;
private long totalMemTableSize;
- private boolean shouldFlush = false;
private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get
flushQueryLock write lock";
private static final String FLUSH_QUERY_WRITE_RELEASE = "{}: {} get
flushQueryLock write lock released";
@@ -277,8 +276,6 @@ public class TsFileProcessor {
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
String deviceId = insertRowPlan.getDeviceId().getFullPath();
- long unsealedResourceIncrement =
- tsFileResource.estimateRamIncrement(deviceId);
for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
// skip failed Measurements
if (insertRowPlan.getDataTypes()[i] == null) {
@@ -302,8 +299,7 @@ public class TsFileProcessor {
textDataIncrement += MemUtils.getBinarySize((Binary)
insertRowPlan.getValues()[i]);
}
}
- updateMemoryInfo(memTableIncrement, unsealedResourceIncrement,
- chunkMetadataIncrement, textDataIncrement);
+ updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
}
private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan,
int start, int end)
@@ -315,7 +311,6 @@ public class TsFileProcessor {
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
String deviceId = insertTabletPlan.getDeviceId().getFullPath();
- long unsealedResourceIncrement =
tsFileResource.estimateRamIncrement(deviceId);
for (int i = 0; i < insertTabletPlan.getDataTypes().length; i++) {
// skip failed Measurements
@@ -352,23 +347,23 @@ public class TsFileProcessor {
textDataIncrement += MemUtils.getBinaryColumnSize(column, start, end);
}
}
- updateMemoryInfo(memTableIncrement, unsealedResourceIncrement,
- chunkMetadataIncrement, textDataIncrement);
+ updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
}
- private void updateMemoryInfo(long memTableIncrement, long
unsealedResourceIncrement,
+ private void updateMemoryInfo(long memTableIncrement,
long chunkMetadataIncrement, long textDataIncrement) throws
WriteProcessException {
memTableIncrement += textDataIncrement;
storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
- tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement +
chunkMetadataIncrement);
+ tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
if (storageGroupInfo.needToReportToSystem()) {
- SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
try {
- StorageEngine.blockInsertionIfReject();
+ if
(!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) {
+ StorageEngine.blockInsertionIfReject(this);
+ }
} catch (WriteProcessRejectException e) {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
- tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement +
chunkMetadataIncrement);
- SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo,
false);
+ tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
+ SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
throw e;
}
}
@@ -420,7 +415,7 @@ public class TsFileProcessor {
if (workMemTable == null) {
return false;
}
- if (shouldFlush) {
+ if (workMemTable.shouldFlush()) {
logger.info("The memtable size {} of tsfile {} reaches the mem control
threshold",
workMemTable.memSize(),
tsFileResource.getTsFile().getAbsolutePath());
return true;
@@ -636,6 +631,9 @@ public class TsFileProcessor {
flushListener.onFlushStart(tobeFlushed);
}
+ if (enableMemControl) {
+
SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
+ }
flushingMemTables.addLast(tobeFlushed);
if (logger.isDebugEnabled()) {
logger.debug(
@@ -650,7 +648,6 @@ public class TsFileProcessor {
totalMemTableSize += tobeFlushed.memSize();
}
workMemTable = null;
- shouldFlush = false;
FlushManager.getInstance().registerTsFileProcessor(this);
}
@@ -687,7 +684,8 @@ public class TsFileProcessor {
tsFileResource.getTsFile().getName(), flushingMemTables.size());
}
// report to System
- SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo,
true);
+ SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+
SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
}
if (logger.isDebugEnabled()) {
logger.debug("{}: {} flush finished, remove a memtable from flushing
list, "
@@ -982,12 +980,12 @@ public class TsFileProcessor {
return sequence;
}
- public void startAsyncFlush() {
-
storageGroupInfo.getStorageGroupProcessor().asyncFlushMemTableInTsFileProcessor(this);
+ public void submitAFlushTask() {
+
this.storageGroupInfo.getStorageGroupProcessor().submitAFlushTaskIfShouldFlush(this);
}
- public void setFlush() {
- shouldFlush = true;
+ public void setWorkMemTableShouldFlush() {
+ workMemTable.setShouldFlush();
}
public void addFlushListener(FlushListener listener) {
@@ -1005,4 +1003,8 @@ public class TsFileProcessor {
public void addCloseFileListeners(Collection<CloseFileListener> listeners) {
closeFileListeners.addAll(listeners);
}
+
+ public boolean alreadyMarkedClosing() {
+ return shouldClose;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 2433259..6e79298 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -798,28 +798,6 @@ public class TsFileResource {
+ RamUsageEstimator.sizeOf(endTimes);
}
- /**
- * Calculate the resource ram increment when insert data in TsFileProcessor
- *
- * @return ramIncrement
- */
- public long estimateRamIncrement(String deviceToBeChecked) {
- long ramIncrement = 0L;
- if (!containsDevice(deviceToBeChecked)) {
- // 80 is the Map.Entry header ram size
- if (deviceToIndex.isEmpty()) {
- ramIncrement += 80;
- }
- // Map.Entry ram size
- ramIncrement += RamUsageEstimator.sizeOf(deviceToBeChecked) + 16;
- // if needs to extend the startTimes and endTimes arrays
- if (deviceToIndex.size() >= startTimes.length) {
- ramIncrement += startTimes.length * Long.BYTES;
- }
- }
- return ramIncrement;
- }
-
public void delete() throws IOException {
if (file.exists()) {
Files.delete(file.toPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index dc314e1..2960d50 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -19,17 +19,16 @@
package org.apache.iotdb.db.rescon;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
-
+import java.util.concurrent.ExecutorService;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,10 +37,14 @@ public class SystemInfo {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private static final Logger logger =
LoggerFactory.getLogger(SystemInfo.class);
- private long totalSgMemCost = 0L;
+ private long totalStorageGroupMemCost = 0L;
+ private ExecutorService flushTaskSubmitThreadPool =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
private volatile boolean rejected = false;
- private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+ private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new
HashMap<>();
+
+ private long flushingMemTablesCost = 0L;
private static final double FLUSH_THERSHOLD =
config.getAllocateMemoryForWrite() * config.getFlushProportion();
@@ -55,24 +58,41 @@ public class SystemInfo {
*
* @param storageGroupInfo storage group
*/
- public synchronized void reportStorageGroupStatus(StorageGroupInfo
storageGroupInfo) {
- long delta = storageGroupInfo.getMemCost() -
- reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
- totalSgMemCost += delta;
+ public synchronized boolean reportStorageGroupStatus(
+ StorageGroupInfo storageGroupInfo, TsFileProcessor tsFileProcessor)
+ throws WriteProcessRejectException {
+ long delta = storageGroupInfo.getMemCost()
+ - reportedStorageGroupMemCostMap.getOrDefault(storageGroupInfo, 0L);
+ totalStorageGroupMemCost += delta;
if (logger.isDebugEnabled()) {
logger.debug("Report Storage Group Status to the system. "
- + "After adding {}, current sg mem cost is {}.", delta,
totalSgMemCost);
+ + "After adding {}, current sg mem cost is {}.", delta,
totalStorageGroupMemCost);
}
- reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
+ reportedStorageGroupMemCostMap.put(storageGroupInfo,
storageGroupInfo.getMemCost());
storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
- if (totalSgMemCost >= FLUSH_THERSHOLD) {
+ if (totalStorageGroupMemCost < FLUSH_THERSHOLD) {
+ return true;
+ } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
+ && totalStorageGroupMemCost < REJECT_THERSHOLD) {
logger.debug("The total storage group mem costs are too large, call for
flushing. "
- + "Current sg cost is {}", totalSgMemCost);
- chooseTSPToMarkFlush();
- }
- if (totalSgMemCost >= REJECT_THERSHOLD) {
- logger.info("Change system to reject status...");
+ + "Current sg cost is {}", totalStorageGroupMemCost);
+ chooseMemTablesToMarkFlush(tsFileProcessor);
+ return true;
+ } else {
rejected = true;
+ if (chooseMemTablesToMarkFlush(tsFileProcessor)) {
+ if (totalStorageGroupMemCost < config.getAllocateMemoryForWrite()) {
+ return true;
+ } else {
+ throw new WriteProcessRejectException(
+ "Total Storage Group MemCost "
+ + totalStorageGroupMemCost
+ + " is over than memorySizeForWriting "
+ + config.getAllocateMemoryForWrite());
+ }
+ } else {
+ return false;
+ }
}
}
@@ -82,103 +102,106 @@ public class SystemInfo {
*
* @param storageGroupInfo storage group
*/
- public void resetStorageGroupStatus(
- StorageGroupInfo storageGroupInfo, boolean shouldInvokeFlush) {
- boolean needForceAsyncFlush = false;
- synchronized (this) {
- if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
- this.totalSgMemCost -=
- (reportedSgMemCostMap.get(storageGroupInfo) -
storageGroupInfo.getMemCost());
- storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
- reportedSgMemCostMap.put(storageGroupInfo,
storageGroupInfo.getMemCost());
- }
-
- if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost <
REJECT_THERSHOLD) {
- logger.debug("Some sg memory released but still exceeding flush
proportion, call flush.");
- if (rejected) {
- logger.info("Some sg memory released, set system to normal status.");
- }
- logCurrentTotalSGMemory();
- rejected = false;
- needForceAsyncFlush = true;
- } else if (totalSgMemCost >= REJECT_THERSHOLD) {
- logger.warn("Some sg memory released, but system is still in reject
status.");
- logCurrentTotalSGMemory();
- rejected = true;
- needForceAsyncFlush = true;
+ /**
+ * Report resetting the mem cost of sg to system. It will be called after
flushing, closing and
+ * failed to insert
+ *
+ * @param storageGroupInfo storage group
+ */
+ public synchronized void resetStorageGroupStatus(StorageGroupInfo
storageGroupInfo) {
+ long delta = 0;
+
+ if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) {
+ delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) -
storageGroupInfo.getMemCost();
+ this.totalStorageGroupMemCost -= delta;
+ storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+ reportedStorageGroupMemCostMap.put(storageGroupInfo,
storageGroupInfo.getMemCost());
+ }
- } else {
- logger.debug("Some sg memory released, system is in normal status.");
- logCurrentTotalSGMemory();
- rejected = false;
+ if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
+ && totalStorageGroupMemCost < REJECT_THERSHOLD) {
+ logger.debug(
+ "SG ({}) released memory (delta: {}) but still exceeding flush
proportion (totalSgMemCost: {}), call flush.",
+ storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(),
+ delta,
+ totalStorageGroupMemCost);
+ if (rejected) {
+ logger.info(
+ "SG ({}) released memory (delta: {}), set system to normal status
(totalSgMemCost: {}).",
+ storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(),
+ delta,
+ totalStorageGroupMemCost);
}
- }
- if (shouldInvokeFlush && needForceAsyncFlush) {
- forceAsyncFlush();
+ logCurrentTotalSGMemory();
+ rejected = false;
+ } else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) {
+ logger.warn(
+ "SG ({}) released memory (delta: {}), but system is still in reject
status (totalSgMemCost: {}).",
+ storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(),
+ delta,
+ totalStorageGroupMemCost);
+ logCurrentTotalSGMemory();
+ rejected = true;
+ } else {
+ logger.debug(
+ "SG ({}) released memory (delta: {}), system is in normal status
(totalSgMemCost: {}).",
+ storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(),
+ delta,
+ totalStorageGroupMemCost);
+ logCurrentTotalSGMemory();
+ rejected = false;
}
}
private void logCurrentTotalSGMemory() {
- logger.debug("Current Sg cost is {}", totalSgMemCost);
+ logger.debug("Current Sg cost is {}", totalStorageGroupMemCost);
}
- /**
- * Order all tsfileProcessors in system by memory cost of actual data points
in memtable.
- * Mark the top K TSPs as to be flushed,
- * so that after flushing the K TSPs, the memory cost should be less than
FLUSH_THRESHOLD
- */
- private void chooseTSPToMarkFlush() {
- if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
- return;
- }
- // If invoke flush by replaying logs, do not flush now!
- if (reportedSgMemCostMap.size() == 0) {
- return;
- }
- // get the tsFile processors which has the max work MemTable size
- List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
- for (TsFileProcessor processor : processors) {
- if (processor != null) {
- processor.setFlush();
- }
- }
+ public synchronized void addFlushingMemTableCost(long flushingMemTableCost) {
+ this.flushingMemTablesCost += flushingMemTableCost;
+ }
+
+ public synchronized void resetFlushingMemTableCost(long
flushingMemTableCost) {
+ this.flushingMemTablesCost -= flushingMemTableCost;
}
/**
- * Be Careful!! This method can only be called by flush thread!
+ * Order all tsfileProcessors in system by memory cost of actual data points
in memtable. Mark the
+ * top K TSPs as to be flushed, so that after flushing the K TSPs, the
memory cost should be less
+ * than FLUSH_THRESHOLD
*/
- private void forceAsyncFlush() {
- if (FlushManager.getInstance().getNumberOfWorkingTasks() > 1) {
- return;
- }
- List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
- if (logger.isDebugEnabled()) {
- logger.debug("[mem control] get {} tsp to flush", processors.size());
- }
- for (TsFileProcessor processor : processors) {
- if (processor != null) {
- processor.startAsyncFlush();
- }
+ private boolean chooseMemTablesToMarkFlush(TsFileProcessor
currentTsFileProcessor) {
+ if (reportedStorageGroupMemCostMap.size() == 0) {
+ return false;
}
- }
-
- private List<TsFileProcessor> getTsFileProcessorsToFlush() {
- PriorityQueue<TsFileProcessor> tsps = new PriorityQueue<>(
- (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(),
o1.getWorkMemTableRamCost()));
- for (StorageGroupInfo sgInfo : reportedSgMemCostMap.keySet()) {
- tsps.addAll(sgInfo.getAllReportedTsp());
+ PriorityQueue<TsFileProcessor> allTsFileProcessors =
+ new PriorityQueue<>(
+ (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(),
o1.getWorkMemTableRamCost()));
+ for (StorageGroupInfo storageGroupInfo :
reportedStorageGroupMemCostMap.keySet()) {
+ allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
}
- List<TsFileProcessor> processors = new ArrayList<>();
+ boolean isCurrentTsFileProcessorSelected = false;
long memCost = 0;
- while (totalSgMemCost - memCost > FLUSH_THERSHOLD / 2) {
- if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
- return processors;
+ long activeMemSize = totalStorageGroupMemCost - flushingMemTablesCost;
+ while (activeMemSize - memCost > FLUSH_THERSHOLD) {
+ if (allTsFileProcessors.isEmpty()
+ || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
+ return isCurrentTsFileProcessorSelected;
+ }
+ TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
+ memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
+ selectedTsFileProcessor.setWorkMemTableShouldFlush();
+ // Open a new thread for submit a flush task
+ flushTaskSubmitThreadPool.submit(
+ () -> {
+ selectedTsFileProcessor.submitAFlushTask();
+ });
+ if (selectedTsFileProcessor == currentTsFileProcessor) {
+ isCurrentTsFileProcessorSelected = true;
}
- processors.add(tsps.peek());
- memCost += tsps.peek().getWorkMemTableRamCost();
- tsps.poll();
+ allTsFileProcessors.poll();
}
- return processors;
+ return isCurrentTsFileProcessorSelected;
}
public boolean isRejected() {
@@ -186,8 +209,8 @@ public class SystemInfo {
}
public void close() {
- reportedSgMemCostMap.clear();
- totalSgMemCost = 0;
+ reportedStorageGroupMemCostMap.clear();
+ totalStorageGroupMemCost = 0;
rejected = false;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 35d9876..4eac99e 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -92,9 +92,7 @@ public class TsFileProcessorTest {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.sgInfo.initTsFileProcessorInfo(processor);
- tsFileProcessorInfo.addTSPMemCost(processor
- .getTsFileResource().calculateRamSize());
- SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
processor.query(deviceId, measurementId, dataType, encoding, props,
context,
tsfileResourcesForQuery);
@@ -148,9 +146,7 @@ public class TsFileProcessorTest {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.sgInfo.initTsFileProcessorInfo(processor);
- tsFileProcessorInfo.addTSPMemCost(processor
- .getTsFileResource().calculateRamSize());
- SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
processor.query(deviceId, measurementId, dataType, encoding, props,
context,
tsfileResourcesForQuery);
@@ -230,9 +226,7 @@ public class TsFileProcessorTest {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.sgInfo.initTsFileProcessorInfo(processor);
- tsFileProcessorInfo.addTSPMemCost(processor
- .getTsFileResource().calculateRamSize());
- SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
processor.query(deviceId, measurementId, dataType, encoding, props,
context,
tsfileResourcesForQuery);
@@ -271,9 +265,7 @@ public class TsFileProcessorTest {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
this.sgInfo.initTsFileProcessorInfo(processor);
- tsFileProcessorInfo.addTSPMemCost(processor
- .getTsFileResource().calculateRamSize());
- SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+ SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
processor.query(deviceId, measurementId, dataType, encoding, props,
context,