This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch restrict_tvlist_rowCount
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/restrict_tvlist_rowCount by
this push:
new c1038f888b0 optimize mem control code and flush metric
c1038f888b0 is described below
commit c1038f888b0c340cd3495f6a7d700bb3a9307287
Author: HTHou <[email protected]>
AuthorDate: Wed Jul 10 18:14:21 2024 +0800
optimize mem control code and flush metric
---
.../iotdb/db/storageengine/StorageEngine.java | 6 +----
.../db/storageengine/dataregion/DataRegion.java | 17 ++++----------
.../dataregion/memtable/AbstractMemTable.java | 16 ++++++-------
.../dataregion/memtable/IMemTable.java | 7 +-----
.../dataregion/memtable/TsFileProcessor.java | 26 ++++++++++++++++------
5 files changed, 32 insertions(+), 40 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 0a768b90df8..8758af3aa00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -163,13 +163,9 @@ public class StorageEngine implements IService {
}
/** block insertion if the insertion is rejected by memory control */
- public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor)
- throws WriteProcessRejectException {
+ public static void blockInsertionIfReject() throws
WriteProcessRejectException {
long startTime = System.currentTimeMillis();
while (SystemInfo.getInstance().isRejected()) {
- if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
- break;
- }
try {
TimeUnit.MILLISECONDS.sleep(CONFIG.getCheckPeriodWhenInsertBlocked());
if (System.currentTimeMillis() - startTime >
CONFIG.getMaxWaitingTimeWhenInsertBlocked()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index ed008bb47d9..31d34cb2a6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -879,7 +879,7 @@ public class DataRegion implements IDataRegionForQuery {
throw new OutOfTTLException(
insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() -
deviceTTL));
}
- StorageEngine.blockInsertionIfReject(null);
+ StorageEngine.blockInsertionIfReject();
long startTime = System.nanoTime();
writeLock("InsertRow");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
@@ -914,7 +914,6 @@ public class DataRegion implements IDataRegionForQuery {
// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
- WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
}
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
@@ -938,7 +937,7 @@ public class DataRegion implements IDataRegionForQuery {
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
public void insertTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
- StorageEngine.blockInsertionIfReject(null);
+ StorageEngine.blockInsertionIfReject();
long startTime = System.nanoTime();
writeLock("insertTablet");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
@@ -1114,7 +1113,6 @@ public class DataRegion implements IDataRegionForQuery {
// check memtable size and may async try to flush the work memtable
if (tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
- WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
}
return true;
}
@@ -1222,7 +1220,6 @@ public class DataRegion implements IDataRegionForQuery {
});
}
- int count = 0;
List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
TsFileProcessor tsFileProcessor = entry.getKey();
@@ -1241,10 +1238,8 @@ public class DataRegion implements IDataRegionForQuery {
// check memtable size and may asyncTryToFlush the work memtable
if (entry.getKey().shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
- count++;
}
}
- WritingMetrics.getInstance().recordMemControlFlushMemTableCount(count);
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
@@ -1332,7 +1327,6 @@ public class DataRegion implements IDataRegionForQuery {
// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
- WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
}
} finally {
writeUnlock();
@@ -3240,7 +3234,7 @@ public class DataRegion implements IDataRegionForQuery {
*/
public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
throws WriteProcessException, BatchProcessException {
- StorageEngine.blockInsertionIfReject(null);
+ StorageEngine.blockInsertionIfReject();
long startTime = System.nanoTime();
writeLock("InsertRowsOfOneDevice");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
@@ -3312,7 +3306,6 @@ public class DataRegion implements IDataRegionForQuery {
});
}
List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
- int count = 0;
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry :
tsFileProcessorMap.entrySet()) {
TsFileProcessor tsFileProcessor = entry.getKey();
InsertRowsNode subInsertRowsNode = entry.getValue();
@@ -3330,10 +3323,8 @@ public class DataRegion implements IDataRegionForQuery {
// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
- count++;
}
}
- WritingMetrics.getInstance().recordMemControlFlushMemTableCount(count);
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
@@ -3358,7 +3349,7 @@ public class DataRegion implements IDataRegionForQuery {
public void insert(InsertRowsNode insertRowsNode)
throws BatchProcessException, WriteProcessRejectException {
- StorageEngine.blockInsertionIfReject(null);
+ StorageEngine.blockInsertionIfReject();
long startTime = System.nanoTime();
writeLock("InsertRows");
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 23e35b2c649..ce85508914f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -89,6 +89,7 @@ public abstract class AbstractMemTable implements IMemTable {
private static final DeviceIDFactory deviceIDFactory =
DeviceIDFactory.getInstance();
private boolean shouldFlush = false;
+ private boolean reachChunkSizeOrPointNumThreshold = false;
private volatile FlushStatus flushStatus = FlushStatus.WORKING;
private final int avgSeriesPointNumThreshold =
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
@@ -409,7 +410,7 @@ public abstract class AbstractMemTable implements IMemTable
{
IWritableMemChunkGroup memChunkGroup =
createMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue,
schemaList)) {
- shouldFlush = true;
+ reachChunkSizeOrPointNumThreshold = true;
}
}
@@ -422,7 +423,7 @@ public abstract class AbstractMemTable implements IMemTable
{
IWritableMemChunkGroup memChunkGroup =
createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue,
schemaList)) {
- shouldFlush = true;
+ reachChunkSizeOrPointNumThreshold = true;
}
}
@@ -444,7 +445,7 @@ public abstract class AbstractMemTable implements IMemTable
{
schemaList,
start,
end)) {
- shouldFlush = true;
+ reachChunkSizeOrPointNumThreshold = true;
}
}
@@ -470,7 +471,7 @@ public abstract class AbstractMemTable implements IMemTable
{
schemaList,
start,
end)) {
- shouldFlush = true;
+ reachChunkSizeOrPointNumThreshold = true;
}
}
@@ -517,11 +518,8 @@ public abstract class AbstractMemTable implements
IMemTable {
}
@Override
- public boolean reachTotalPointNumThreshold() {
- if (totalPointsNum == 0) {
- return false;
- }
- return totalPointsNum >= totalPointsNumThreshold;
+ public boolean reachChunkSizeOrPointNumThreshold() {
+ return reachChunkSizeOrPointNumThreshold;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index d68d1ef010d..7918969d2e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -82,12 +82,7 @@ public interface IMemTable extends WALEntryValue {
/** only used when mem control enabled */
long getTVListsRamCost();
- /**
- * only used when mem control enabled
- *
- * @return whether the average number of points in each WritableChunk
reaches the threshold
- */
- boolean reachTotalPointNumThreshold();
+ boolean reachChunkSizeOrPointNumThreshold();
int getSeriesNumber();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 78eb2309cf6..ae974b17519 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -48,7 +48,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
-import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener;
@@ -106,6 +105,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -934,7 +934,22 @@ public class TsFileProcessor {
if (dataRegionInfo.needToReportToSystem()) {
try {
if (!SystemInfo.getInstance().reportStorageGroupStatus(dataRegionInfo,
this)) {
- StorageEngine.blockInsertionIfReject(this);
+ long startTime = System.currentTimeMillis();
+ while (SystemInfo.getInstance().isRejected()) {
+ if (workMemTable.shouldFlush()) {
+ break;
+ }
+ try {
+
TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+ if (System.currentTimeMillis() - startTime
+ > config.getMaxWaitingTimeWhenInsertBlocked()) {
+ throw new WriteProcessRejectException(
+ "System rejected over " + (System.currentTimeMillis() -
startTime) + "ms");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
} catch (WriteProcessRejectException e) {
dataRegionInfo.releaseStorageGroupMemCost(memTableIncrement);
@@ -1010,13 +1025,10 @@ public class TsFileProcessor {
return false;
}
if (workMemTable.shouldFlush()) {
+ WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
return true;
}
- if (workMemTable.reachTotalPointNumThreshold()) {
- logger.info(
- "The avg series points num {} of tsfile {} reaches the threshold",
- workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(),
- tsFileResource.getTsFile().getAbsolutePath());
+ if (workMemTable.reachChunkSizeOrPointNumThreshold()) {
WritingMetrics.getInstance().recordSeriesFullFlushMemTableCount(1);
return true;
}