This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 76bcf678b82 Add size threshold to control wal disk usage (#10160)
76bcf678b82 is described below

commit 76bcf678b82a5883aca4efb88fa1ce728a4ce5f5
Author: Alan Choo <[email protected]>
AuthorDate: Fri Jun 16 11:09:44 2023 +0800

    Add size threshold to control wal disk usage (#10160)
---
 .../java/org/apache/iotdb/db/wal/WALManager.java   | 15 ++++-
 .../java/org/apache/iotdb/db/wal/node/WALNode.java | 71 +++++++++++++---------
 2 files changed, 56 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java 
b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
index c236083c7fe..36db197dae5 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.db.wal.utils.WALMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -170,9 +171,17 @@ public class WALManager implements IService {
   }
 
   private void deleteOutdatedFiles() {
-    for (WALNode walNode : walNodesManager.getNodesSnapshot()) {
+    List<WALNode> walNodes = walNodesManager.getNodesSnapshot();
+    walNodes.sort((node1, node2) -> Long.compare(node2.getDiskUsage(), 
node1.getDiskUsage()));
+    for (WALNode walNode : walNodes) {
       walNode.deleteOutdatedFiles();
     }
+    if (shouldThrottle()) {
+      logger.warn(
+          "WAL disk usage {} is larger than the 
iot_consensus_throttle_threshold_in_byte {}, please check your write load, iot 
consensus and the pipe module. It's better to allocate more disk for WAL.",
+          getTotalDiskUsage(),
+          config.getThrottleThreshold());
+    }
   }
 
   /** Wait until all write-ahead logs are flushed */
@@ -192,6 +201,10 @@ public class WALManager implements IService {
     }
   }
 
+  public boolean shouldThrottle() {
+    return getTotalDiskUsage() >= config.getThrottleThreshold();
+  }
+
   public long getTotalDiskUsage() {
     return totalDiskUsage.get();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java 
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 98f985da322..4891062e892 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -234,14 +234,19 @@ public class WALNode implements IWALNode {
         }
       }
 
+      // delete outdated files
       logger.debug(
           "Start deleting outdated wal files for wal node-{}, the first valid 
version id is {}, and the safely deleted search index is {}.",
           identifier,
           firstValidVersionId,
           safelyDeletedSearchIndex);
-
-      // delete outdated files
-      deleteOutdatedFiles();
+      boolean pinnedByIoTConsensus = deleteOutdatedFiles();
+      if (pinnedByIoTConsensus) {
+        logger.debug(
+            "Cannot delete wal files for wal node-{} because of wal files are 
pinned by IoTConsensus.",
+            identifier);
+        return;
+      }
 
       // calculate effective information ratio
       long costOfActiveMemTables = 
checkpointManager.getTotalCostOfActiveMemTables();
@@ -257,35 +262,35 @@ public class WALNode implements IWALNode {
           effectiveInfoRatio,
           costOfActiveMemTables,
           costOfFlushedMemTables);
-      // effective information ratio is too small
-      // update first valid version id by snapshotting or flushing memTable,
+
+      // try updating first valid version id by snapshotting or flushing 
memTable,
       // then delete old .wal files again
-      if (effectiveInfoRatio < config.getWalMinEffectiveInfoRatio()) {
-        logger.debug(
-            "Effective information ratio {} (active memTables cost is {}, 
flushed memTables cost is {}) of wal node-{} is below wal min effective info 
ratio {}, some memTables will be snapshot or flushed.",
-            effectiveInfoRatio,
-            costOfActiveMemTables,
-            costOfFlushedMemTables,
-            identifier,
-            config.getWalMinEffectiveInfoRatio());
-        if (snapshotOrFlushMemTable() && recursionTime < MAX_RECURSION_TIME) {
-          // wal is used to search, cannot optimize files deletion
-          if (safelyDeletedSearchIndex != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) 
{
-            return;
-          }
-          recursionTime++;
-          run();
-        }
+      if (!shouldSnapshotOrFlush()) {
+        return;
+      }
+      logger.debug(
+          "Effective information ratio {} (active memTables cost is {}, 
flushed memTables cost is {}) of wal node-{} is below wal min effective info 
ratio {}, some memTables will be snapshot or flushed.",
+          effectiveInfoRatio,
+          costOfActiveMemTables,
+          costOfFlushedMemTables,
+          identifier,
+          config.getWalMinEffectiveInfoRatio());
+      boolean isSuccess = snapshotOrFlushMemTable();
+      if (isSuccess && recursionTime < MAX_RECURSION_TIME) {
+        recursionTime++;
+        run();
       }
     }
 
-    private void deleteOutdatedFiles() {
+    /** Return true iff cannot delete all outdated files because of 
IoTConsensus */
+    private boolean deleteOutdatedFiles() {
       // find all files to delete
       // delete files whose version < firstValidVersionId
       File[] filesToDelete = logDirectory.listFiles(this::filterFilesToDelete);
-      if (filesToDelete == null) {
-        return;
+      if (filesToDelete == null || filesToDelete.length == 0) {
+        return false;
       }
+
       // delete files whose content's search index are all <= 
safelyDeletedSearchIndex
       WALFileUtils.ascSortByVersionId(filesToDelete);
       // judge DEFAULT_SAFELY_DELETED_SEARCH_INDEX for standalone, 
Long.MIN_VALUE for iot
@@ -296,7 +301,7 @@ public class WALNode implements IWALNode {
                   filesToDelete, safelyDeletedSearchIndex + 1);
       // delete files whose file status is CONTAINS_NONE_SEARCH_INDEX
       if (endFileIndex == -1) {
-        endFileIndex++;
+        endFileIndex = 0;
       }
       while (endFileIndex < filesToDelete.length) {
         if (WALFileUtils.parseStatusCode(filesToDelete[endFileIndex].getName())
@@ -305,6 +310,7 @@ public class WALNode implements IWALNode {
         }
         endFileIndex++;
       }
+
       // delete files
       int deletedFilesNum = 0;
       long deletedFilesSize = 0;
@@ -330,6 +336,7 @@ public class WALNode implements IWALNode {
           "Successfully delete {} outdated wal files for wal node-{}.",
           deletedFilesNum,
           identifier);
+      return endFileIndex < filesToDelete.length;
     }
 
     private boolean filterFilesToDelete(File dir, String name) {
@@ -343,6 +350,12 @@ public class WALNode implements IWALNode {
       return toDelete;
     }
 
+    /** Return true iff effective information ratio is too small or disk usage 
is too large */
+    private boolean shouldSnapshotOrFlush() {
+      return effectiveInfoRatio < config.getWalMinEffectiveInfoRatio()
+          || WALManager.getInstance().shouldThrottle();
+    }
+
     /**
      * Snapshot or flush one memTable,
      *
@@ -355,12 +368,12 @@ public class WALNode implements IWALNode {
         return false;
       }
       if (oldestMemTableInfo.isPinned()) {
-        logger.info(
-            "MemTable-{} is pinned and effective information ratio {} of wal 
node-{} is below wal min effective info ratio {}.",
-            oldestMemTableInfo.getMemTableId(),
+        logger.warn(
+            "Pipe: Effective information ratio {} of wal node-{} is below wal 
min effective info ratio {}. But fail to delete memTable-{}'s wal files because 
they are pinned by the Pipe module.",
             effectiveInfoRatio,
             identifier,
-            config.getWalMinEffectiveInfoRatio());
+            config.getWalMinEffectiveInfoRatio(),
+            oldestMemTableInfo.getMemTableId());
         return false;
       }
       IMemTable oldestMemTable = oldestMemTableInfo.getMemTable();

Reply via email to