This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-3164 by this push:
     new effb3a22c8 add debug log
effb3a22c8 is described below

commit effb3a22c8487959075293deef4fd7aadecf4bfc
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Jul 14 14:47:32 2022 +0800

    add debug log
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  1 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  6 +++++-
 .../iotdb/db/rescon/memory/MemoryController.java   |  1 +
 .../db/rescon/memory/WriteMemoryController.java    | 23 ++++++++++++++++++----
 4 files changed, 26 insertions(+), 5 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 5c7fa5e62d..eeea38fcd0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -104,6 +104,7 @@ public class MemTableFlushTask {
       estimatedTemporaryMemSize =
           memTable.memSize() / memTable.getSeriesNumber() * 
config.getIoTaskQueueSizeForFlushing();
       
SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize);
+      // TODO: ALLOCATE IN WRITE MEMORY CONTROLLER
     }
     long start = System.currentTimeMillis();
     long sortTime = 0;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 702e336dc3..ce96d157bb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -794,7 +794,6 @@ public class TsFileProcessor {
     } catch (WriteProcessRejectException e) {
       storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
       tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
-      controller.resetStorageGroupInfo(storageGroupInfo);
       throw e;
     }
     workMemTable.addTVListRamCost(memTableIncrement);
@@ -1169,7 +1168,12 @@ public class TsFileProcessor {
         }
         // report to System
         
WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo);
+        logger.error(
+            "Memory usage for {} is {}",
+            storageGroupName,
+            
WriteMemoryController.getInstance().getMemoryUsageForSg(storageGroupName));
         
WriteMemoryController.getInstance().releaseMemory(memTable.getTVListsRamCost());
+        logger.error("Release size {} for {}", memTable.getTVListsRamCost(), 
storageGroupName);
       }
       if (logger.isDebugEnabled()) {
         logger.debug(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 044b691ef7..9e39fad30e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -140,6 +140,7 @@ public class MemoryController<T> {
   private void checkTrigger(long prevUsage, long newUsage, T triggerParam) {
     if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && 
trigger != null) {
       if (triggerRunning.compareAndSet(false, true)) {
+        log.info("Start to execute memory controller trigger");
         try {
           trigger.run(triggerParam);
         } finally {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index 45315f4479..d9dcb48c5c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class WriteMemoryController extends MemoryController<TsFileProcessor> {
   private static final Logger logger = 
LoggerFactory.getLogger(WriteMemoryController.class);
@@ -40,8 +41,9 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   private static final double FLUSH_THRESHOLD = memorySizeForWrite * 
config.getFlushProportion();
   private static final double REJECT_THRESHOLD = memorySizeForWrite * 
config.getRejectProportion();
   private volatile boolean rejected = false;
-  private volatile long flushingMemory = 0;
+  private AtomicLong flushingMemory = new AtomicLong(0);
   private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new 
ConcurrentHashMap<>();
+  private Map<String, AtomicLong> memoryUsageForEachSg = new 
ConcurrentHashMap<>();
   private ExecutorService flushTaskSubmitThreadPool =
       IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
 
@@ -53,7 +55,7 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
 
   public boolean tryAllocateMemory(long size, StorageGroupInfo info, 
TsFileProcessor processor) {
     boolean success = super.tryAllocateMemory(size, processor);
-    if (memoryUsage.get() > REJECT_THRESHOLD) {
+    if (memoryUsage.get() > REJECT_THRESHOLD && !rejected) {
       logger.info(
           "Change system to reject status. Triggered by: logical SG ({}), mem 
cost delta ({}), totalSgMemCost ({}).",
           info.getDataRegion().getLogicalStorageGroupName(),
@@ -61,7 +63,16 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
           memoryUsage.get());
       rejected = true;
     }
-    reportedStorageGroupMemCostMap.put(info, info.getMemCost());
+    if (success) {
+      reportedStorageGroupMemCostMap.put(info, info.getMemCost());
+      memoryUsageForEachSg
+          .computeIfAbsent(
+              info.getDataRegion().getLogicalStorageGroupName()
+                  + "-"
+                  + info.getDataRegion().getDataRegionId(),
+              x -> new AtomicLong(0))
+          .addAndGet(size);
+    }
     return success;
   }
 
@@ -100,7 +111,7 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
       allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
     }
     long memCost = 0;
-    long activeMemSize = memoryUsage.get();
+    long activeMemSize = memoryUsage.get() - flushingMemory.get();
     while (activeMemSize - memCost > FLUSH_THRESHOLD) {
       if (allTsFileProcessors.isEmpty()
           || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
@@ -120,4 +131,8 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   public void resetStorageGroupInfo(StorageGroupInfo info) {
     reportedStorageGroupMemCostMap.put(info, info.getMemCost());
   }
+
+  public long getMemoryUsageForSg(String sgName) {
+    return memoryUsageForEachSg.get(sgName).get();
+  }
 }

Reply via email to