This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-3164 by this push:
new effb3a22c8 add debug log
effb3a22c8 is described below
commit effb3a22c8487959075293deef4fd7aadecf4bfc
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Jul 14 14:47:32 2022 +0800
add debug log
---
.../iotdb/db/engine/flush/MemTableFlushTask.java | 1 +
.../db/engine/storagegroup/TsFileProcessor.java | 6 +++++-
.../iotdb/db/rescon/memory/MemoryController.java | 1 +
.../db/rescon/memory/WriteMemoryController.java | 23 ++++++++++++++++++----
4 files changed, 26 insertions(+), 5 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 5c7fa5e62d..eeea38fcd0 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -104,6 +104,7 @@ public class MemTableFlushTask {
estimatedTemporaryMemSize =
memTable.memSize() / memTable.getSeriesNumber() *
config.getIoTaskQueueSizeForFlushing();
SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+ // TODO: ALLOCATE IN WRITE MEMORY CONTROLLER
}
long start = System.currentTimeMillis();
long sortTime = 0;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 702e336dc3..ce96d157bb 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -794,7 +794,6 @@ public class TsFileProcessor {
} catch (WriteProcessRejectException e) {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
- controller.resetStorageGroupInfo(storageGroupInfo);
throw e;
}
workMemTable.addTVListRamCost(memTableIncrement);
@@ -1169,7 +1168,12 @@ public class TsFileProcessor {
}
// report to System
WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo);
+ logger.error(
+ "Memory usage for {} is {}",
+ storageGroupName,
+
WriteMemoryController.getInstance().getMemoryUsageForSg(storageGroupName));
WriteMemoryController.getInstance().releaseMemory(memTable.getTVListsRamCost());
+ logger.error("Release size {} for {}", memTable.getTVListsRamCost(),
storageGroupName);
}
if (logger.isDebugEnabled()) {
logger.debug(
diff --git
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 044b691ef7..9e39fad30e 100644
---
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
+++
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -140,6 +140,7 @@ public class MemoryController<T> {
private void checkTrigger(long prevUsage, long newUsage, T triggerParam) {
if (newUsage >= triggerThreshold && prevUsage < triggerThreshold &&
trigger != null) {
if (triggerRunning.compareAndSet(false, true)) {
+ log.info("Start to execute memory controller trigger");
try {
trigger.run(triggerParam);
} finally {
diff --git
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index 45315f4479..d9dcb48c5c 100644
---
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
public class WriteMemoryController extends MemoryController<TsFileProcessor> {
private static final Logger logger =
LoggerFactory.getLogger(WriteMemoryController.class);
@@ -40,8 +41,9 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
private static final double FLUSH_THRESHOLD = memorySizeForWrite *
config.getFlushProportion();
private static final double REJECT_THRESHOLD = memorySizeForWrite *
config.getRejectProportion();
private volatile boolean rejected = false;
- private volatile long flushingMemory = 0;
+ private AtomicLong flushingMemory = new AtomicLong(0);
private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new
ConcurrentHashMap<>();
+ private Map<String, AtomicLong> memoryUsageForEachSg = new
ConcurrentHashMap<>();
private ExecutorService flushTaskSubmitThreadPool =
IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
@@ -53,7 +55,7 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
public boolean tryAllocateMemory(long size, StorageGroupInfo info,
TsFileProcessor processor) {
boolean success = super.tryAllocateMemory(size, processor);
- if (memoryUsage.get() > REJECT_THRESHOLD) {
+ if (memoryUsage.get() > REJECT_THRESHOLD && !rejected) {
logger.info(
"Change system to reject status. Triggered by: logical SG ({}), mem
cost delta ({}), totalSgMemCost ({}).",
info.getDataRegion().getLogicalStorageGroupName(),
@@ -61,7 +63,16 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
memoryUsage.get());
rejected = true;
}
- reportedStorageGroupMemCostMap.put(info, info.getMemCost());
+ if (success) {
+ reportedStorageGroupMemCostMap.put(info, info.getMemCost());
+ memoryUsageForEachSg
+ .computeIfAbsent(
+ info.getDataRegion().getLogicalStorageGroupName()
+ + "-"
+ + info.getDataRegion().getDataRegionId(),
+ x -> new AtomicLong(0))
+ .addAndGet(size);
+ }
return success;
}
@@ -100,7 +111,7 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
}
long memCost = 0;
- long activeMemSize = memoryUsage.get();
+ long activeMemSize = memoryUsage.get() - flushingMemory.get();
while (activeMemSize - memCost > FLUSH_THRESHOLD) {
if (allTsFileProcessors.isEmpty()
|| allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
@@ -120,4 +131,8 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
public void resetStorageGroupInfo(StorageGroupInfo info) {
reportedStorageGroupMemCostMap.put(info, info.getMemCost());
}
+
+ public long getMemoryUsageForSg(String sgName) {
+ return memoryUsageForEachSg.get(sgName).get();
+ }
}