This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch restrict_tvlist_rowCount
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/restrict_tvlist_rowCount by 
this push:
     new c1038f888b0 optimize mem control code and flush metric
c1038f888b0 is described below

commit c1038f888b0c340cd3495f6a7d700bb3a9307287
Author: HTHou <[email protected]>
AuthorDate: Wed Jul 10 18:14:21 2024 +0800

    optimize mem control code and flush metric
---
 .../iotdb/db/storageengine/StorageEngine.java      |  6 +----
 .../db/storageengine/dataregion/DataRegion.java    | 17 ++++----------
 .../dataregion/memtable/AbstractMemTable.java      | 16 ++++++-------
 .../dataregion/memtable/IMemTable.java             |  7 +-----
 .../dataregion/memtable/TsFileProcessor.java       | 26 ++++++++++++++++------
 5 files changed, 32 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 0a768b90df8..8758af3aa00 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -163,13 +163,9 @@ public class StorageEngine implements IService {
   }
 
   /** block insertion if the insertion is rejected by memory control */
-  public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor)
-      throws WriteProcessRejectException {
+  public static void blockInsertionIfReject() throws 
WriteProcessRejectException {
     long startTime = System.currentTimeMillis();
     while (SystemInfo.getInstance().isRejected()) {
-      if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
-        break;
-      }
       try {
         TimeUnit.MILLISECONDS.sleep(CONFIG.getCheckPeriodWhenInsertBlocked());
         if (System.currentTimeMillis() - startTime > 
CONFIG.getMaxWaitingTimeWhenInsertBlocked()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index ed008bb47d9..31d34cb2a6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -879,7 +879,7 @@ public class DataRegion implements IDataRegionForQuery {
       throw new OutOfTTLException(
           insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() - 
deviceTTL));
     }
-    StorageEngine.blockInsertionIfReject(null);
+    StorageEngine.blockInsertionIfReject();
     long startTime = System.nanoTime();
     writeLock("InsertRow");
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
@@ -914,7 +914,6 @@ public class DataRegion implements IDataRegionForQuery {
       // check memtable size and may asyncTryToFlush the work memtable
       if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
         fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
-        WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
       }
       if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
         if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
@@ -938,7 +937,7 @@ public class DataRegion implements IDataRegionForQuery {
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
   public void insertTablet(InsertTabletNode insertTabletNode)
       throws BatchProcessException, WriteProcessException {
-    StorageEngine.blockInsertionIfReject(null);
+    StorageEngine.blockInsertionIfReject();
     long startTime = System.nanoTime();
     writeLock("insertTablet");
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
@@ -1114,7 +1113,6 @@ public class DataRegion implements IDataRegionForQuery {
     // check memtable size and may async try to flush the work memtable
     if (tsFileProcessor.shouldFlush()) {
       fileFlushPolicy.apply(this, tsFileProcessor, sequence);
-      WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
     }
     return true;
   }
@@ -1222,7 +1220,6 @@ public class DataRegion implements IDataRegionForQuery {
           });
     }
 
-    int count = 0;
     List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
     for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
       TsFileProcessor tsFileProcessor = entry.getKey();
@@ -1241,10 +1238,8 @@ public class DataRegion implements IDataRegionForQuery {
       // check memtable size and may asyncTryToFlush the work memtable
       if (entry.getKey().shouldFlush()) {
         fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
-        count++;
       }
     }
-    WritingMetrics.getInstance().recordMemControlFlushMemTableCount(count);
 
     
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
     
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
@@ -1332,7 +1327,6 @@ public class DataRegion implements IDataRegionForQuery {
       // check memtable size and may asyncTryToFlush the work memtable
       if (tsFileProcessor.shouldFlush()) {
         fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
-        WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
       }
     } finally {
       writeUnlock();
@@ -3240,7 +3234,7 @@ public class DataRegion implements IDataRegionForQuery {
    */
   public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
       throws WriteProcessException, BatchProcessException {
-    StorageEngine.blockInsertionIfReject(null);
+    StorageEngine.blockInsertionIfReject();
     long startTime = System.nanoTime();
     writeLock("InsertRowsOfOneDevice");
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
@@ -3312,7 +3306,6 @@ public class DataRegion implements IDataRegionForQuery {
             });
       }
       List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
-      int count = 0;
       for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
         TsFileProcessor tsFileProcessor = entry.getKey();
         InsertRowsNode subInsertRowsNode = entry.getValue();
@@ -3330,10 +3323,8 @@ public class DataRegion implements IDataRegionForQuery {
         // check memtable size and may asyncTryToFlush the work memtable
         if (tsFileProcessor.shouldFlush()) {
           fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
-          count++;
         }
       }
-      WritingMetrics.getInstance().recordMemControlFlushMemTableCount(count);
 
       
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
       
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
@@ -3358,7 +3349,7 @@ public class DataRegion implements IDataRegionForQuery {
 
   public void insert(InsertRowsNode insertRowsNode)
       throws BatchProcessException, WriteProcessRejectException {
-    StorageEngine.blockInsertionIfReject(null);
+    StorageEngine.blockInsertionIfReject();
     long startTime = System.nanoTime();
     writeLock("InsertRows");
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 23e35b2c649..ce85508914f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -89,6 +89,7 @@ public abstract class AbstractMemTable implements IMemTable {
   private static final DeviceIDFactory deviceIDFactory = 
DeviceIDFactory.getInstance();
 
   private boolean shouldFlush = false;
+  private boolean reachChunkSizeOrPointNumThreshold = false;
   private volatile FlushStatus flushStatus = FlushStatus.WORKING;
   private final int avgSeriesPointNumThreshold =
       
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
@@ -409,7 +410,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     IWritableMemChunkGroup memChunkGroup =
         createMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
     if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, 
schemaList)) {
-      shouldFlush = true;
+      reachChunkSizeOrPointNumThreshold = true;
     }
   }
 
@@ -422,7 +423,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     IWritableMemChunkGroup memChunkGroup =
         createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
     if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, 
schemaList)) {
-      shouldFlush = true;
+      reachChunkSizeOrPointNumThreshold = true;
     }
   }
 
@@ -444,7 +445,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
         schemaList,
         start,
         end)) {
-      shouldFlush = true;
+      reachChunkSizeOrPointNumThreshold = true;
     }
   }
 
@@ -470,7 +471,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
         schemaList,
         start,
         end)) {
-      shouldFlush = true;
+      reachChunkSizeOrPointNumThreshold = true;
     }
   }
 
@@ -517,11 +518,8 @@ public abstract class AbstractMemTable implements 
IMemTable {
   }
 
   @Override
-  public boolean reachTotalPointNumThreshold() {
-    if (totalPointsNum == 0) {
-      return false;
-    }
-    return totalPointsNum >= totalPointsNumThreshold;
+  public boolean reachChunkSizeOrPointNumThreshold() {
+    return reachChunkSizeOrPointNumThreshold;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index d68d1ef010d..7918969d2e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -82,12 +82,7 @@ public interface IMemTable extends WALEntryValue {
   /** only used when mem control enabled */
   long getTVListsRamCost();
 
-  /**
-   * only used when mem control enabled
-   *
-   * @return whether the average number of points in each WritableChunk 
reaches the threshold
-   */
-  boolean reachTotalPointNumThreshold();
+  boolean reachChunkSizeOrPointNumThreshold();
 
   int getSeriesNumber();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 78eb2309cf6..ae974b17519 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -48,7 +48,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
-import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
 import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener;
@@ -106,6 +105,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -934,7 +934,22 @@ public class TsFileProcessor {
     if (dataRegionInfo.needToReportToSystem()) {
       try {
         if (!SystemInfo.getInstance().reportStorageGroupStatus(dataRegionInfo, 
this)) {
-          StorageEngine.blockInsertionIfReject(this);
+          long startTime = System.currentTimeMillis();
+          while (SystemInfo.getInstance().isRejected()) {
+            if (workMemTable.shouldFlush()) {
+              break;
+            }
+            try {
+              
TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+              if (System.currentTimeMillis() - startTime
+                  > config.getMaxWaitingTimeWhenInsertBlocked()) {
+                throw new WriteProcessRejectException(
+                    "System rejected over " + (System.currentTimeMillis() - 
startTime) + "ms");
+              }
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          }
         }
       } catch (WriteProcessRejectException e) {
         dataRegionInfo.releaseStorageGroupMemCost(memTableIncrement);
@@ -1010,13 +1025,10 @@ public class TsFileProcessor {
       return false;
     }
     if (workMemTable.shouldFlush()) {
+      WritingMetrics.getInstance().recordMemControlFlushMemTableCount(1);
       return true;
     }
-    if (workMemTable.reachTotalPointNumThreshold()) {
-      logger.info(
-          "The avg series points num {} of tsfile {} reaches the threshold",
-          workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(),
-          tsFileResource.getTsFile().getAbsolutePath());
+    if (workMemTable.reachChunkSizeOrPointNumThreshold()) {
       WritingMetrics.getInstance().recordSeriesFullFlushMemTableCount(1);
       return true;
     }

Reply via email to