This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b07fafe3954 Optimized wal file deletion algorithm (#11948)
b07fafe3954 is described below

commit b07fafe395442c90eb981585619a233ce42b373b
Author: Alan Choo <[email protected]>
AuthorDate: Tue Jan 23 20:04:13 2024 +0800

    Optimized wal file deletion algorithm (#11948)
    
    Co-authored-by: Zhijia Cao <[email protected]>
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  12 +-
 .../db/storageengine/dataregion/DataRegion.java    |   6 +-
 .../dataregion/memtable/AbstractMemTable.java      |  18 +
 .../dataregion/memtable/IMemTable.java             |   2 +
 .../dataregion/memtable/TsFileProcessor.java       |   5 +
 .../dataregion/wal/buffer/WALBuffer.java           |  57 +-
 .../wal/checkpoint/CheckpointManager.java          |  50 +-
 .../dataregion/wal/checkpoint/MemTableInfo.java    |  22 +-
 .../dataregion/wal/io/WALByteBufReader.java        |  26 +-
 .../dataregion/wal/io/WALMetaData.java             |  76 ++-
 .../storageengine/dataregion/wal/node/WALNode.java | 198 +++----
 .../dataregion/wal/recover/WALNodeRecoverTask.java |  41 +-
 .../storageengine/dataregion/DataRegionTest.java   |   4 +
 .../dataregion/wal/node/WALEntryHandlerTest.java   |  13 +-
 .../wal/node/WalDeleteOutdatedNewTest.java         | 585 +++++++++++++++++++++
 .../wal/recover/WALRecoverWriterTest.java          |   9 +-
 .../resources/conf/iotdb-common.properties         |  20 +-
 17 files changed, 944 insertions(+), 200 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3b6bac840a8..26659d87f80 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -390,25 +390,25 @@ public class IoTDBConfig {
   private boolean enableTimedFlushSeqMemtable = true;
 
   /**
-   * If a memTable's created time is older than current time minus this, the 
memtable will be
+   * If a memTable's last update time is older than current time minus this, 
the memtable will be
    * flushed to disk.(only check sequence tsfiles' memtables) Unit: ms
    */
-  private long seqMemtableFlushInterval = 3 * 60 * 60 * 1000L;
+  private long seqMemtableFlushInterval = 10 * 60 * 1000L;
 
   /** The interval to check whether sequence memtables need flushing. Unit: ms 
*/
-  private long seqMemtableFlushCheckInterval = 10 * 60 * 1000L;
+  private long seqMemtableFlushCheckInterval = 30 * 1000L;
 
   /** Whether to timed flush unsequence tsfiles' memtables. */
   private boolean enableTimedFlushUnseqMemtable = true;
 
   /**
-   * If a memTable's created time is older than current time minus this, the 
memtable will be
+   * If a memTable's last update time is older than current time minus this, 
the memtable will be
    * flushed to disk.(only check unsequence tsfiles' memtables) Unit: ms
    */
-  private long unseqMemtableFlushInterval = 3 * 60 * 60 * 1000L;
+  private long unseqMemtableFlushInterval = 10 * 60 * 1000L;
 
   /** The interval to check whether unsequence memtables need flushing. Unit: 
ms */
-  private long unseqMemtableFlushCheckInterval = 10 * 60 * 1000L;
+  private long unseqMemtableFlushCheckInterval = 30 * 1000L;
 
   /** The sort algorithm used in TVList */
   private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index e74c99c762c..8ad98d45eae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -406,7 +406,7 @@ public class DataRegion implements IDataRegionForQuery {
         if (lastLogTime + config.getRecoveryLogIntervalInMs() < 
System.currentTimeMillis()) {
           logger.info(
               "The data region {}[{}] has recovered {}%, please wait a 
moment.",
-              databaseName, dataRegionId, recoveredFilesNum * 1.0 / 
numOfFilesToRecover);
+              databaseName, dataRegionId, recoveredFilesNum * 100.0 / 
numOfFilesToRecover);
           lastLogTime = System.currentTimeMillis();
         }
       }
@@ -1680,7 +1680,7 @@ public class DataRegion implements IDataRegionForQuery {
           new ArrayList<>(workSequenceTsFileProcessors.values());
       long timeLowerBound = System.currentTimeMillis() - 
config.getSeqMemtableFlushInterval();
       for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
-        if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
+        if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
           logger.info(
               "Exceed sequence memtable flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
               tsFileProcessor.getTimeRangeId(),
@@ -1706,7 +1706,7 @@ public class DataRegion implements IDataRegionForQuery {
       long timeLowerBound = System.currentTimeMillis() - 
config.getUnseqMemtableFlushInterval();
 
       for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
-        if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
+        if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
           logger.info(
               "Exceed unsequence memtable flush interval, so flush working 
memtable of time partition {} in database {}[{}]",
               tsFileProcessor.getTimeRangeId(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 1488659e6cf..0b2ff4a04cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -98,6 +98,14 @@ public abstract class AbstractMemTable implements IMemTable {
 
   private final long createdTime = System.currentTimeMillis();
 
+  /** this time is updated by the timed flush, same as createdTime when the 
feature is disabled. */
+  private long updateTime = createdTime;
+  /**
+   * check whether this memTable has been updated since last timed flush 
check, update updateTime
+   * when changed
+   */
+  private long lastTotalPointsNum = totalPointsNum;
+
   private String database;
   private String dataRegionId;
 
@@ -589,6 +597,16 @@ public abstract class AbstractMemTable implements 
IMemTable {
     return createdTime;
   }
 
+  /** Check whether updated since last get method */
+  @Override
+  public long getUpdateTime() {
+    if (lastTotalPointsNum != totalPointsNum) {
+      lastTotalPointsNum = totalPointsNum;
+      updateTime = System.currentTimeMillis();
+    }
+    return updateTime;
+  }
+
   @Override
   public FlushStatus getFlushStatus() {
     return flushStatus;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 4efcbdf6f8c..8e1a883d2a4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -168,6 +168,8 @@ public interface IMemTable extends WALEntryValue {
 
   long getCreatedTime();
 
+  long getUpdateTime();
+
   FlushStatus getFlushStatus();
 
   void setFlushStatus(FlushStatus flushStatus);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 4cd143ad5fb..5bbf9ac7fe3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1576,6 +1576,11 @@ public class TsFileProcessor {
     return workMemTable != null ? workMemTable.getCreatedTime() : 
Long.MAX_VALUE;
   }
 
+  /** Return Long.MAX_VALUE if workMemTable is null */
+  public long getWorkMemTableUpdateTime() {
+    return workMemTable != null ? workMemTable.getUpdateTime() : 
Long.MAX_VALUE;
+  }
+
   public long getLastWorkMemtableFlushTime() {
     return lastWorkMemtableFlushTime;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index dea4a55c4ab..d6deb613e6f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.CheckpointMan
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALNodeClosedException;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
 import org.apache.iotdb.db.utils.MmapUtil;
@@ -40,14 +41,23 @@ import org.apache.iotdb.db.utils.MmapUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -102,6 +112,9 @@ public class WALBuffer extends AbstractWALBuffer {
   // single thread to sync syncingBuffer to disk
   private final ExecutorService syncBufferThread;
 
+  // manage wal files which have MemTableIds
+  private final Map<Long, Set<Long>> memTableIdsOfWal = new 
ConcurrentHashMap<>();
+
   public WALBuffer(String identifier, String logDirectory) throws 
FileNotFoundException {
     this(identifier, logDirectory, new CheckpointManager(identifier, 
logDirectory), 0, 0L);
   }
@@ -183,6 +196,7 @@ public class WALBuffer extends AbstractWALBuffer {
   /** This info class traverses some extra info from serializeThread to 
syncBufferThread. */
   private static class SerializeInfo {
     final WALMetaData metaData = new WALMetaData();
+    final Map<Long, Long> memTableId2WalDiskUsage = new HashMap<>();
     final List<Checkpoint> checkpoints = new ArrayList<>();
     final List<WALFlushListener> fsyncListeners = new ArrayList<>();
     WALFlushListener rollWALFileWriterListener = null;
@@ -279,10 +293,11 @@ public class WALBuffer extends AbstractWALBuffer {
         return;
       }
 
-      int size = byteBufferView.position();
+      int startPosition = byteBufferView.position();
+      int size;
       try {
         walEntry.serialize(byteBufferView);
-        size = byteBufferView.position() - size;
+        size = byteBufferView.position() - startPosition;
       } catch (Exception e) {
         logger.error(
             "Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", 
identifier, e);
@@ -304,7 +319,9 @@ public class WALBuffer extends AbstractWALBuffer {
       }
       // update related info
       totalSize += size;
-      info.metaData.add(size, searchIndex);
+      info.metaData.add(size, searchIndex, walEntry.getMemTableId());
+      info.memTableId2WalDiskUsage.compute(
+          walEntry.getMemTableId(), (k, v) -> v == null ? size : v + size);
       walEntry.getWalFlushListener().getWalEntryHandler().setSize(size);
       info.fsyncListeners.add(walEntry.getWalFlushListener());
     }
@@ -509,6 +526,12 @@ public class WALBuffer extends AbstractWALBuffer {
         switchSyncingBufferToIdle();
       }
 
+      // update info
+      memTableIdsOfWal
+          .computeIfAbsent(currentWALFileVersion, memTableIds -> new 
HashSet<>())
+          .addAll(info.metaData.getMemTablesId());
+      
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage);
+
       boolean forceSuccess = false;
       // try to roll log writer
       if (info.rollWALFileWriterListener != null
@@ -682,4 +705,32 @@ public class WALBuffer extends AbstractWALBuffer {
   public CheckpointManager getCheckpointManager() {
     return checkpointManager;
   }
+
+  public void removeMemTableIdsOfWal(Long walVersionId) {
+    this.memTableIdsOfWal.remove(walVersionId);
+  }
+
+  public Set<Long> getMemTableIds(long fileVersionId) {
+    if (fileVersionId >= currentWALFileVersion) {
+      return Collections.emptySet();
+    }
+    return memTableIdsOfWal.computeIfAbsent(
+        fileVersionId,
+        id -> {
+          try {
+            File file = WALFileUtils.getWALFile(new File(logDirectory), id);
+            return WALMetaData.readFromWALFile(
+                    file, FileChannel.open(file.toPath(), 
StandardOpenOption.READ))
+                .getMemTablesId();
+          } catch (IOException e) {
+            logger.error("Fail to read memTable ids from the wal file {}.", 
id);
+            return new HashSet<>();
+          }
+        });
+  }
+
+  @TestOnly
+  public Map<Long, Set<Long>> getMemTableIdsOfWal() {
+    return memTableIdsOfWal;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
index 50ac8ca6155..7d7ff0f76b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
@@ -40,9 +40,10 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -61,7 +62,7 @@ public class CheckpointManager implements AutoCloseable {
   private final Lock infoLock = new ReentrantLock();
   // region these variables should be protected by infoLock
   // memTable id -> memTable info
-  private final Map<Long, MemTableInfo> memTableId2Info = new HashMap<>();
+  private final Map<Long, MemTableInfo> memTableId2Info = new 
ConcurrentHashMap<>();
   // cache the biggest byte buffer to serialize checkpoint
   // it's safe to use volatile here to make this reference thread-safe.
   @SuppressWarnings("squid:S3077")
@@ -88,7 +89,7 @@ public class CheckpointManager implements AutoCloseable {
     logHeader();
   }
 
-  public List<MemTableInfo> snapshotMemTableInfos() {
+  public List<MemTableInfo> activeOrPinnedMemTables() {
     infoLock.lock();
     try {
       return new ArrayList<>(memTableId2Info.values());
@@ -121,7 +122,7 @@ public class CheckpointManager implements AutoCloseable {
    */
   private void makeGlobalInfoCP() {
     long start = System.nanoTime();
-    List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
+    List<MemTableInfo> memTableInfos = activeOrPinnedMemTables();
     memTableInfos.removeIf(MemTableInfo::isFlushed);
     Checkpoint checkpoint = new 
Checkpoint(CheckpointType.GLOBAL_MEMORY_TABLE_INFO, memTableInfos);
     logByCachedByteBuffer(checkpoint);
@@ -315,20 +316,13 @@ public class CheckpointManager implements AutoCloseable {
   }
   // endregion
 
-  /** Get MemTableInfo of oldest MemTable, whose first version id is smallest. 
*/
-  public MemTableInfo getOldestMemTableInfo() {
+  /** Get MemTableInfo of oldest unpinned MemTable, whose first version id is 
smallest. */
+  public MemTableInfo getOldestUnpinnedMemTableInfo() {
     // find oldest memTable
-    List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
-    if (memTableInfos.isEmpty()) {
-      return null;
-    }
-    MemTableInfo oldestMemTableInfo = memTableInfos.get(0);
-    for (MemTableInfo memTableInfo : memTableInfos) {
-      if (oldestMemTableInfo.getFirstFileVersionId() > 
memTableInfo.getFirstFileVersionId()) {
-        oldestMemTableInfo = memTableInfo;
-      }
-    }
-    return oldestMemTableInfo;
+    return activeOrPinnedMemTables().stream()
+        .filter(memTableInfo -> !memTableInfo.isPinned())
+        .min(Comparator.comparingLong(MemTableInfo::getMemTableId))
+        .orElse(null);
   }
 
   /**
@@ -337,7 +331,7 @@ public class CheckpointManager implements AutoCloseable {
    * @return Return {@link Long#MIN_VALUE} if no file is valid
    */
   public long getFirstValidWALVersionId() {
-    List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
+    List<MemTableInfo> memTableInfos = activeOrPinnedMemTables();
     long firstValidVersionId = memTableInfos.isEmpty() ? Long.MIN_VALUE : 
Long.MAX_VALUE;
     for (MemTableInfo memTableInfo : memTableInfos) {
       firstValidVersionId = Math.min(firstValidVersionId, 
memTableInfo.getFirstFileVersionId());
@@ -345,20 +339,28 @@ public class CheckpointManager implements AutoCloseable {
     return firstValidVersionId;
   }
 
+  /** Update wal disk cost of active memTables. */
+  public void updateCostOfActiveMemTables(Map<Long, Long> 
memTableId2WalDiskUsage) {
+    for (Map.Entry<Long, Long> memTableWalUsage : 
memTableId2WalDiskUsage.entrySet()) {
+      memTableId2Info.computeIfPresent(
+          memTableWalUsage.getKey(),
+          (k, v) -> {
+            v.addWalDiskUsage(memTableWalUsage.getValue());
+            return v;
+          });
+    }
+  }
+
   /** Get total cost of active memTables. */
   public long getTotalCostOfActiveMemTables() {
-    List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
+    List<MemTableInfo> memTableInfos = activeOrPinnedMemTables();
     long totalCost = 0;
     for (MemTableInfo memTableInfo : memTableInfos) {
       // flushed memTables are not active
       if (memTableInfo.isFlushed()) {
         continue;
       }
-      if (config.isEnableMemControl()) {
-        totalCost += memTableInfo.getMemTable().getTVListsRamCost();
-      } else {
-        totalCost++;
-      }
+      totalCost += memTableInfo.getWalDiskUsage();
     }
 
     return totalCost;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
index 109652d8786..b9744d444ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
@@ -37,6 +37,12 @@ import java.util.Objects;
 public class MemTableInfo implements WALEntryValue {
   // memTable id 8 bytes, first version id 8 bytes
   private static final int FIXED_SERIALIZED_SIZE = Long.BYTES * 2;
+  // memTable id
+  private long memTableId;
+  // path of the tsFile which this memTable will be flushed to
+  private String tsFilePath;
+  // version id of the file where this memTable's first WALEntry is located
+  private volatile long firstFileVersionId;
 
   // memTable
   private IMemTable memTable;
@@ -44,14 +50,10 @@ public class MemTableInfo implements WALEntryValue {
   private int pinCount;
   // memTable is flushed or not
   private boolean flushed;
-  // memTable id
-  private long memTableId;
   // data region id
   private int dataRegionId;
-  // path of the tsFile which this memTable will be flushed to
-  private String tsFilePath;
-  // version id of the file where this memTable's first WALEntry is located
-  private volatile long firstFileVersionId;
+  // total wal usage of this memTable
+  private long walDiskUsage;
 
   private MemTableInfo() {}
 
@@ -158,4 +160,12 @@ public class MemTableInfo implements WALEntryValue {
   public void setFirstFileVersionId(long firstFileVersionId) {
     this.firstFileVersionId = firstFileVersionId;
   }
+
+  public long getWalDiskUsage() {
+    return walDiskUsage;
+  }
+
+  public void addWalDiskUsage(long size) {
+    walDiskUsage += size;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
index a57d7faf750..f101eaf3647 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -46,23 +46,8 @@ public class WALByteBufReader implements Closeable {
   public WALByteBufReader(File logFile, FileChannel channel) throws 
IOException {
     this.logFile = logFile;
     this.channel = channel;
-    if (channel.size() < WALWriter.MAGIC_STRING_BYTES
-        || !readTailMagic().equals(WALWriter.MAGIC_STRING)) {
-      throw new IOException(String.format("Broken wal file %s", logFile));
-    }
-    // load metadata size
-    ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
-    long position = channel.size() - WALWriter.MAGIC_STRING_BYTES - 
Integer.BYTES;
-    channel.read(metadataSizeBuf, position);
-    metadataSizeBuf.flip();
-    // load metadata
-    int metadataSize = metadataSizeBuf.getInt();
-    ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
-    channel.read(metadataBuf, position - metadataSize);
-    metadataBuf.flip();
-    metaData = WALMetaData.deserialize(metadataBuf);
-    // init iterator
-    sizeIterator = metaData.getBuffersSize().iterator();
+    this.metaData = WALMetaData.readFromWALFile(logFile, channel);
+    this.sizeIterator = metaData.getBuffersSize().iterator();
     channel.position(0);
   }
 
@@ -84,11 +69,8 @@ public class WALByteBufReader implements Closeable {
     return buffer;
   }
 
-  private String readTailMagic() throws IOException {
-    ByteBuffer magicStringBytes = 
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
-    channel.read(magicStringBytes, channel.size() - 
WALWriter.MAGIC_STRING_BYTES);
-    magicStringBytes.flip();
-    return new String(magicStringBytes.array());
+  public WALMetaData getMetaData() {
+    return metaData;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
index f9b97ece3e5..b8cb4dfef13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
@@ -22,9 +22,14 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import org.apache.iotdb.db.utils.SerializedSize;
 
+import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Metadata exists at the end of each wal file, including each entry's size, 
search index of first
@@ -38,21 +43,25 @@ public class WALMetaData implements SerializedSize {
   private long firstSearchIndex;
   // each entry's size
   private final List<Integer> buffersSize;
+  // memTable ids of this wal file
+  private final Set<Long> memTablesId;
 
   public WALMetaData() {
-    this(ConsensusReqReader.DEFAULT_SEARCH_INDEX, new ArrayList<>());
+    this(ConsensusReqReader.DEFAULT_SEARCH_INDEX, new ArrayList<>(), new 
HashSet<>());
   }
 
-  public WALMetaData(long firstSearchIndex, List<Integer> buffersSize) {
+  public WALMetaData(long firstSearchIndex, List<Integer> buffersSize, 
Set<Long> memTablesId) {
     this.firstSearchIndex = firstSearchIndex;
     this.buffersSize = buffersSize;
+    this.memTablesId = memTablesId;
   }
 
-  public void add(int size, long searchIndex) {
+  public void add(int size, long searchIndex, long memTableId) {
     if (buffersSize.isEmpty()) {
       firstSearchIndex = searchIndex;
     }
     buffersSize.add(size);
+    memTablesId.add(memTableId);
   }
 
   public void addAll(WALMetaData metaData) {
@@ -60,11 +69,14 @@ public class WALMetaData implements SerializedSize {
       firstSearchIndex = metaData.getFirstSearchIndex();
     }
     buffersSize.addAll(metaData.getBuffersSize());
+    memTablesId.addAll(metaData.getMemTablesId());
   }
 
   @Override
   public int serializedSize() {
-    return FIXED_SERIALIZED_SIZE + buffersSize.size() * Integer.BYTES;
+    return FIXED_SERIALIZED_SIZE
+        + buffersSize.size() * Integer.BYTES
+        + (memTablesId.isEmpty() ? 0 : Integer.BYTES + memTablesId.size() * 
Long.BYTES);
   }
 
   public void serialize(ByteBuffer buffer) {
@@ -73,6 +85,12 @@ public class WALMetaData implements SerializedSize {
     for (int size : buffersSize) {
       buffer.putInt(size);
     }
+    if (!memTablesId.isEmpty()) {
+      buffer.putInt(memTablesId.size());
+      for (long memTableId : memTablesId) {
+        buffer.putLong(memTableId);
+      }
+    }
   }
 
   public static WALMetaData deserialize(ByteBuffer buffer) {
@@ -82,14 +100,62 @@ public class WALMetaData implements SerializedSize {
     for (int i = 0; i < entriesNum; ++i) {
       buffersSize.add(buffer.getInt());
     }
-    return new WALMetaData(firstSearchIndex, buffersSize);
+    Set<Long> memTablesId = new HashSet<>();
+    if (buffer.hasRemaining()) {
+      int memTablesIdNum = buffer.getInt();
+      for (int i = 0; i < memTablesIdNum; ++i) {
+        memTablesId.add(buffer.getLong());
+      }
+    }
+    return new WALMetaData(firstSearchIndex, buffersSize, memTablesId);
   }
 
   public List<Integer> getBuffersSize() {
     return buffersSize;
   }
 
+  public Set<Long> getMemTablesId() {
+    return memTablesId;
+  }
+
   public long getFirstSearchIndex() {
     return firstSearchIndex;
   }
+
+  public static WALMetaData readFromWALFile(File logFile, FileChannel channel) 
throws IOException {
+    if (channel.size() < WALWriter.MAGIC_STRING_BYTES
+        || !readTailMagic(channel).equals(WALWriter.MAGIC_STRING)) {
+      throw new IOException(String.format("Broken wal file %s", logFile));
+    }
+    // load metadata size
+    ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+    long position = channel.size() - WALWriter.MAGIC_STRING_BYTES - 
Integer.BYTES;
+    channel.read(metadataSizeBuf, position);
+    metadataSizeBuf.flip();
+    // load metadata
+    int metadataSize = metadataSizeBuf.getInt();
+    ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
+    channel.read(metadataBuf, position - metadataSize);
+    metadataBuf.flip();
+    WALMetaData metaData = WALMetaData.deserialize(metadataBuf);
+    if (metaData.memTablesId.isEmpty()) {
+      int offset = Byte.BYTES;
+      for (int size : metaData.buffersSize) {
+        channel.position(offset);
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        channel.read(buffer);
+        buffer.clear();
+        metaData.memTablesId.add(buffer.getLong());
+        offset += size;
+      }
+    }
+    return metaData;
+  }
+
+  private static String readTailMagic(FileChannel channel) throws IOException {
+    ByteBuffer magicStringBytes = 
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+    channel.read(magicStringBytes, channel.size() - 
WALWriter.MAGIC_STRING_BYTES);
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 9d117c6dccf..f59bdf8cc2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.node;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -66,18 +65,18 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. 
If search is enabled,
@@ -104,11 +103,6 @@ public class WALNode implements IWALNode {
   // memTable id -> memTable snapshot count
   // used to avoid write amplification caused by frequent snapshot
   private final Map<Long, Integer> memTableSnapshotCount = new 
ConcurrentHashMap<>();
-  // total cost of flushedMemTables
-  // when memControl enabled, cost is memTable ram cost, otherwise cost is 
memTable count
-  private final AtomicLong totalCostOfFlushedMemTables = new AtomicLong();
-  // version id -> cost sum of memTables flushed at this file version
-  private final Map<Long, Long> walFileVersionId2MemTablesTotalCost = new 
ConcurrentHashMap<>();
   // insert nodes whose search index are before this value can be deleted 
safely
   private volatile long safelyDeletedSearchIndex = 
DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
 
@@ -150,6 +144,7 @@ public class WALNode implements IWALNode {
   }
 
   private WALFlushListener log(WALEntry walEntry) {
+
     buffer.write(walEntry);
     // set handler for pipe
     walEntry.getWalFlushListener().getWalEntryHandler().setWalNode(this, 
walEntry.getMemTableId());
@@ -174,12 +169,6 @@ public class WALNode implements IWALNode {
 
     // remove snapshot info
     memTableSnapshotCount.remove(memTable.getMemTableId());
-    // update cost info
-    long cost = config.isEnableMemControl() ? memTable.getTVListsRamCost() : 1;
-    long currentWALFileVersion = buffer.getCurrentWALFileVersion();
-    walFileVersionId2MemTablesTotalCost.compute(
-        currentWALFileVersion, (k, v) -> v == null ? cost : v + cost);
-    totalCostOfFlushedMemTables.addAndGet(cost);
   }
 
   @Override
@@ -231,16 +220,17 @@ public class WALNode implements IWALNode {
   }
 
   private class DeleteOutdatedFileTask implements Runnable {
+    private File[] sortedWalFilesExcludingLast;
+
+    private List<MemTableInfo> activeOrPinnedMemTables;
+
     private static final int MAX_RECURSION_TIME = 5;
-    // .wal files whose version ids are less than first valid version id 
should be deleted
-    private long firstValidVersionId;
+
     // the effective information ratio
     private double effectiveInfoRatio = 0d;
 
     private List<Long> pinnedMemTableIds;
 
-    private File[] filesShouldDelete;
-
     private int fileIndexAfterFilterSafelyDeleteIndex = Integer.MAX_VALUE;
     private List<Long> successfullyDeleted;
     private long deleteFileSize;
@@ -251,21 +241,44 @@ public class WALNode implements IWALNode {
       // Do nothing
     }
 
-    private void init() {
-      this.firstValidVersionId = initFirstValidWALVersionId();
-      this.filesShouldDelete = 
logDirectory.listFiles(this::filterFilesToDelete);
-      if (filesShouldDelete == null) {
-        filesShouldDelete = new File[0];
+    private boolean initAndCheckIfNeedContinue() {
+      rollWalFileIfHaveNoActiveMemTable();
+      File[] allWalFilesOfOneNode = WALFileUtils.listAllWALFiles(logDirectory);
+      if (allWalFilesOfOneNode == null || allWalFilesOfOneNode.length <= 1) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "wal node-{}:no wal file or wal file number less than or equal 
to one was found",
+              identifier);
+        }
+        return false;
       }
+      WALFileUtils.ascSortByVersionId(allWalFilesOfOneNode);
+      this.sortedWalFilesExcludingLast =
+          Arrays.copyOfRange(allWalFilesOfOneNode, 0, 
allWalFilesOfOneNode.length - 1);
+      this.activeOrPinnedMemTables = 
checkpointManager.activeOrPinnedMemTables();
       this.pinnedMemTableIds = initPinnedMemTableIds();
-      WALFileUtils.ascSortByVersionId(filesShouldDelete);
       this.fileIndexAfterFilterSafelyDeleteIndex = 
initFileIndexAfterFilterSafelyDeleteIndex();
       this.successfullyDeleted = new ArrayList<>();
       this.deleteFileSize = 0;
+      return true;
+    }
+
+    /**
+     * This means that the relevant memTable in the file has been successfully 
flushed, so we should
+     * scroll through a new wal file so that the current file can be deleted
+     */
+    public void rollWalFileIfHaveNoActiveMemTable() {
+      long firstVersionId = checkpointManager.getFirstValidWALVersionId();
+      if (firstVersionId == Long.MIN_VALUE) {
+        // roll wal log writer to delete current wal file
+        if (buffer.getCurrentWALFileSize() > 0) {
+          rollWALFile();
+        }
+      }
     }
 
     private List<Long> initPinnedMemTableIds() {
-      List<MemTableInfo> memTableInfos = 
checkpointManager.snapshotMemTableInfos();
+      List<MemTableInfo> memTableInfos = 
checkpointManager.activeOrPinnedMemTables();
       if (memTableInfos.isEmpty()) {
         return new ArrayList<>();
       }
@@ -283,8 +296,11 @@ public class WALNode implements IWALNode {
       // The intent of the loop execution here is to try to get as many 
memTable flush or snapshot
       // as possible when the valid information ratio is less than the 
configured value.
       while (recursionTime < MAX_RECURSION_TIME) {
-        // init delete outdated file task fields
-        init();
+        // init delete outdated file task fields, if the number of wal files 
is less than one, the
+        // subsequent logic is not executed
+        if (!initAndCheckIfNeedContinue()) {
+          break;
+        }
 
         // delete outdated WAL files and record which delete successfully and 
which delete failed.
         deleteOutdatedFilesAndUpdateMetric();
@@ -308,42 +324,36 @@ public class WALNode implements IWALNode {
     private void updateEffectiveInfoRationAndUpdateMetric() {
       // calculate effective information ratio
       long costOfActiveMemTables = 
checkpointManager.getTotalCostOfActiveMemTables();
-      long costOfFlushedMemTables = totalCostOfFlushedMemTables.get();
-      long totalCost = costOfActiveMemTables + costOfFlushedMemTables;
+      long totalCost =
+          (getCurrentWALFileVersion()
+                  - 
checkpointManager.getOldestUnpinnedMemTableInfo().getFirstFileVersionId()
+                  + 1)
+              * config.getWalFileSizeThresholdInByte();
       if (totalCost == 0) {
         return;
       }
       effectiveInfoRatio = (double) costOfActiveMemTables / totalCost;
       logger.debug(
-          "Effective information ratio is {}, active memTables cost is {}, 
flushed memTables cost is {}",
+          "Effective information ratio is {}, active memTables cost is {}, 
total cost is {}",
           effectiveInfoRatio,
           costOfActiveMemTables,
-          costOfFlushedMemTables);
+          totalCost);
       WRITING_METRICS.recordWALNodeEffectiveInfoRatio(identifier, 
effectiveInfoRatio);
     }
 
     private void summarizeExecuteResult() {
-      if (filesShouldDelete.length == 0) {
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "wal node-{}:no wal file was found that should be deleted, 
current first valid version id is {}",
-              identifier,
-              firstValidVersionId);
-        }
-        return;
-      }
-
       if (!pinnedMemTableIds.isEmpty()
-          || fileIndexAfterFilterSafelyDeleteIndex < filesShouldDelete.length) 
{
+          || fileIndexAfterFilterSafelyDeleteIndex < 
sortedWalFilesExcludingLast.length) {
         if (logger.isDebugEnabled()) {
           StringBuilder summary =
               new StringBuilder(
                   String.format(
-                      "wal node-%s delete outdated files summary:the range 
that should be removed is: [%d,%d], delete successful is [%s], end file index 
is: [%s].The following reasons influenced the result: %s",
+                      "wal node-%s delete outdated files summary:the range is: 
[%d,%d], delete successful is [%s], safely delete file index is: [%s].The 
following reasons influenced the result: %s",
                       identifier,
-                      
WALFileUtils.parseVersionId(filesShouldDelete[0].getName()),
+                      
WALFileUtils.parseVersionId(sortedWalFilesExcludingLast[0].getName()),
                       WALFileUtils.parseVersionId(
-                          filesShouldDelete[filesShouldDelete.length - 
1].getName()),
+                          
sortedWalFilesExcludingLast[sortedWalFilesExcludingLast.length - 1]
+                              .getName()),
                       StringUtils.join(successfullyDeleted, ","),
                       fileIndexAfterFilterSafelyDeleteIndex,
                       System.getProperty("line.separator")));
@@ -355,7 +365,7 @@ public class WALNode implements IWALNode {
                 .append(".")
                 .append(System.getProperty("line.separator"));
           }
-          if (fileIndexAfterFilterSafelyDeleteIndex < 
filesShouldDelete.length) {
+          if (fileIndexAfterFilterSafelyDeleteIndex < 
sortedWalFilesExcludingLast.length) {
             summary.append(
                 String.format(
                     "- The data in the wal file was not consumed by the 
consensus group,current search index is %d, safely delete index is %d",
@@ -367,33 +377,28 @@ public class WALNode implements IWALNode {
 
       } else {
         logger.debug(
-            "Successfully delete {} outdated wal files for wal node-{},first 
valid version id is {}",
+            "Successfully delete {} outdated wal files for wal node-{}",
             successfullyDeleted.size(),
-            identifier,
-            firstValidVersionId);
+            identifier);
       }
     }
 
     /** Delete obsolete wal files while recording which succeeded or failed */
     private void deleteOutdatedFilesAndUpdateMetric() {
-      if (filesShouldDelete.length == 0) {
-        return;
-      }
-      for (int i = 0; i < fileIndexAfterFilterSafelyDeleteIndex; ++i) {
-        long fileSize = filesShouldDelete[i].length();
-        long versionId = 
WALFileUtils.parseVersionId(filesShouldDelete[i].getName());
-        if (filesShouldDelete[i].delete()) {
-          deleteFileSize += fileSize;
-          Long memTableRamCostSum = 
walFileVersionId2MemTablesTotalCost.remove(versionId);
-          if (memTableRamCostSum != null) {
-            totalCostOfFlushedMemTables.addAndGet(-memTableRamCostSum);
+      for (File currentWal : sortedWalFilesExcludingLast) {
+        long searchIndex = 
WALFileUtils.parseStartSearchIndex(currentWal.getName());
+        WALFileStatus walFileStatus = 
WALFileUtils.parseStatusCode(currentWal.getName());
+        long versionId = WALFileUtils.parseVersionId(currentWal.getName());
+        if (canDeleteFile(searchIndex, walFileStatus, versionId)) {
+          long fileSize = currentWal.length();
+          if (currentWal.delete()) {
+            deleteFileSize += fileSize;
+            buffer.removeMemTableIdsOfWal(versionId);
+            successfullyDeleted.add(versionId);
+          } else {
+            logger.info(
+                "Fail to delete outdated wal file {} of wal node-{}.", 
currentWal, identifier);
           }
-          successfullyDeleted.add(versionId);
-        } else {
-          logger.info(
-              "Fail to delete outdated wal file {} of wal node-{}.",
-              filesShouldDelete[i],
-              identifier);
         }
       }
       buffer.subtractDiskUsage(deleteFileSize);
@@ -403,15 +408,15 @@ public class WALNode implements IWALNode {
     private int initFileIndexAfterFilterSafelyDeleteIndex() {
       int endFileIndex =
           safelyDeletedSearchIndex == DEFAULT_SAFELY_DELETED_SEARCH_INDEX
-              ? filesShouldDelete.length
+              ? sortedWalFilesExcludingLast.length
               : WALFileUtils.binarySearchFileBySearchIndex(
-                  filesShouldDelete, safelyDeletedSearchIndex + 1);
+                  sortedWalFilesExcludingLast, safelyDeletedSearchIndex + 1);
       // delete files whose file status is CONTAINS_NONE_SEARCH_INDEX
       if (endFileIndex == -1) {
         endFileIndex = 0;
       }
-      while (endFileIndex < filesShouldDelete.length) {
-        if 
(WALFileUtils.parseStatusCode(filesShouldDelete[endFileIndex].getName())
+      while (endFileIndex < sortedWalFilesExcludingLast.length) {
+        if 
(WALFileUtils.parseStatusCode(sortedWalFilesExcludingLast[endFileIndex].getName())
             == WALFileStatus.CONTAINS_SEARCH_INDEX) {
           break;
         }
@@ -420,17 +425,6 @@ public class WALNode implements IWALNode {
       return endFileIndex;
     }
 
-    private boolean filterFilesToDelete(File dir, String name) {
-      Pattern pattern = WALFileUtils.WAL_FILE_NAME_PATTERN;
-      Matcher matcher = pattern.matcher(name);
-      boolean toDelete = false;
-      if (matcher.find()) {
-        long versionId = 
Long.parseLong(matcher.group(IoTDBConstant.WAL_VERSION_ID));
-        toDelete = versionId < firstValidVersionId;
-      }
-      return toDelete;
-    }
-
     /** Return true iff effective information ratio is too small or disk usage 
is too large. */
     private boolean shouldSnapshotOrFlush() {
       return effectiveInfoRatio < config.getWalMinEffectiveInfoRatio()
@@ -447,7 +441,7 @@ public class WALNode implements IWALNode {
         return false;
       }
       // find oldest memTable
-      MemTableInfo oldestMemTableInfo = 
checkpointManager.getOldestMemTableInfo();
+      MemTableInfo oldestMemTableInfo = 
checkpointManager.getOldestUnpinnedMemTableInfo();
       if (oldestMemTableInfo == null) {
         return false;
       }
@@ -584,22 +578,25 @@ public class WALNode implements IWALNode {
       }
     }
 
-    public long initFirstValidWALVersionId() {
-      long firstVersionId = checkpointManager.getFirstValidWALVersionId();
-      // This means that the relevant memTable in the file has been 
successfully flushed, so we
-      // should scroll through a new wal file so that the current file can be 
deleted
-      if (firstVersionId == Long.MIN_VALUE) {
-        // roll wal log writer to delete current wal file
-        if (buffer.getCurrentWALFileSize() > 0) {
-          rollWALFile();
-        }
-        // update firstValidVersionId
-        firstVersionId = checkpointManager.getFirstValidWALVersionId();
-        if (firstVersionId == Long.MIN_VALUE) {
-          firstVersionId = buffer.getCurrentWALFileVersion();
-        }
+    public boolean isContainsActiveOrPinnedMemTable(Long versionId) {
+      Set<Long> memTableIdsOfCurrentWal = buffer.getMemTableIds(versionId);
+      // If this set is empty, there is a case where WalEntry has been logged 
but not persisted,
+      // because WalEntry is persisted asynchronously. In this case, the file 
cannot be deleted
+      // directly, so it is considered active
+      if (memTableIdsOfCurrentWal == null || 
memTableIdsOfCurrentWal.isEmpty()) {
+        return true;
       }
-      return firstVersionId;
+      return !Collections.disjoint(
+          activeOrPinnedMemTables.stream()
+              .map(MemTableInfo::getMemTableId)
+              .collect(Collectors.toSet()),
+          memTableIdsOfCurrentWal);
+    }
+
+    private boolean canDeleteFile(long searchIndex, WALFileStatus 
walFileStatus, long versionId) {
+      return (searchIndex < safelyDeletedSearchIndex
+              || walFileStatus == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX)
+          && !isContainsActiveOrPinnedMemTable(versionId);
     }
   }
 
@@ -991,4 +988,9 @@ public class WALNode implements IWALNode {
   public void setBufferSize(int size) {
     buffer.setBufferSize(size);
   }
+
+  @TestOnly
+  public WALBuffer getWALBuffer() {
+    return buffer;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
index 3d9fd3c051a..94fab3181e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -40,11 +41,16 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
@@ -74,6 +80,12 @@ public class WALNodeRecoverTask implements Runnable {
   @Override
   public void run() {
     logger.info("Start recovering WAL node in the directory {}", logDirectory);
+
+    // recover version id and search index
+    long[] indexInfo = recoverLastFile();
+    long lastVersionId = indexInfo[0];
+    long lastSearchIndex = indexInfo[1];
+
     try {
       recoverInfoFromCheckpoints();
       recoverTsFiles();
@@ -110,10 +122,6 @@ public class WALNodeRecoverTask implements Runnable {
             logger.error("error when delete checkpoint file. {}", 
checkpointFile, e);
           }
         }
-        // recover version id and search index
-        long[] indexInfo = recoverLastFile();
-        long lastVersionId = indexInfo[0];
-        long lastSearchIndex = indexInfo[1];
         // register wal node
         WALManager.getInstance()
             .registerWALNode(
@@ -140,7 +148,7 @@ public class WALNodeRecoverTask implements Runnable {
     File lastWALFile = walFiles[walFiles.length - 1];
     long lastVersionId = WALFileUtils.parseVersionId(lastWALFile.getName());
     long lastSearchIndex = 
WALFileUtils.parseStartSearchIndex(lastWALFile.getName());
-    WALMetaData metaData = new WALMetaData(lastSearchIndex, new ArrayList<>());
+    WALMetaData metaData = new WALMetaData(lastSearchIndex, new ArrayList<>(), 
new HashSet<>());
     WALFileStatus fileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
     try (WALReader walReader = new WALReader(lastWALFile, true)) {
       while (walReader.hasNext()) {
@@ -155,7 +163,7 @@ public class WALNodeRecoverTask implements Runnable {
             fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
           }
         }
-        metaData.add(walEntry.serializedSize(), searchIndex);
+        metaData.add(walEntry.serializedSize(), searchIndex, 
walEntry.getMemTableId());
       }
     } catch (Exception e) {
       logger.warn("Fail to read wal logs from {}, skip them", lastWALFile, e);
@@ -235,20 +243,27 @@ public class WALNodeRecoverTask implements Runnable {
     // read .wal files and redo logs
     for (int i = 0; i < walFiles.length; ++i) {
       File walFile = walFiles[i];
-      // last wal file may corrupt
-      try (WALReader walReader = new WALReader(walFile, i == walFiles.length - 
1)) {
-        while (walReader.hasNext()) {
-          WALEntry walEntry = walReader.next();
-          if (!memTableId2Info.containsKey(walEntry.getMemTableId())) {
+      try (WALByteBufReader reader = new WALByteBufReader(walFile)) {
+        if (Collections.disjoint(memTableId2Info.keySet(), 
reader.getMetaData().getMemTablesId())) {
+          continue;
+        }
+        while (reader.hasNext()) {
+          ByteBuffer buffer = reader.next();
+          // see WALInfoEntry#serialize, entry type
+          buffer.position(Byte.BYTES);
+          long memTableId = buffer.getLong();
+          if (!memTableId2Info.containsKey(memTableId)) {
             continue;
           }
-
+          buffer.clear();
+          WALEntry walEntry =
+              WALEntry.deserialize(new DataInputStream(new 
ByteArrayInputStream(buffer.array())));
           UnsealedTsFileRecoverPerformer recoverPerformer =
               memTableId2RecoverPerformer.get(walEntry.getMemTableId());
           if (recoverPerformer != null) {
             recoverPerformer.redoLog(walEntry);
           } else {
-            logger.warn(
+            logger.debug(
                 "Fail to find TsFile recover performer for wal entry in TsFile 
{}", walFile);
           }
         }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 46707569e56..ff348415cba 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -1099,6 +1099,8 @@ public class DataRegionTest {
     FlushManager flushManager = FlushManager.getInstance();
 
     // flush the sequence memtable
+    tsFileProcessor.getWorkMemTable().getUpdateTime();
+    Thread.sleep(500);
     dataRegion.timedFlushSeqMemTable();
 
     // wait until memtable flush task is done
@@ -1154,6 +1156,8 @@ public class DataRegionTest {
     FlushManager flushManager = FlushManager.getInstance();
 
     // flush the unsequence memtable
+    tsFileProcessor.getWorkMemTable().getUpdateTime();
+    Thread.sleep(500);
     dataRegion.timedFlushUnseqMemTable();
 
     // wait until memtable flush task is done
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
index 08feee58228..58e9aefec32 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
@@ -124,9 +124,12 @@ public class WALEntryHandlerTest {
     // pin memTable
     WALEntryHandler handler = flushListener.getWalEntryHandler();
     handler.pinMemTable();
-    walNode1.onMemTableFlushed(memTable);
     // roll wal file
     walNode1.rollWALFile();
+    InsertRowNode node2 = getInsertRowNode(devicePath, 
System.currentTimeMillis());
+    node2.setSearchIndex(2);
+    walNode1.log(memTable.getMemTableId(), node2);
+    walNode1.onMemTableFlushed(memTable);
     walNode1.rollWALFile();
     // find node1
     ConsensusReqReader.ReqIterator itr = walNode1.getReqIterator(1);
@@ -173,13 +176,11 @@ public class WALEntryHandlerTest {
     // unpin 1
     CheckpointManager checkpointManager = walNode1.getCheckpointManager();
     handler.unpinMemTable();
-    MemTableInfo oldestMemTableInfo = 
checkpointManager.getOldestMemTableInfo();
-    assertEquals(memTable.getMemTableId(), oldestMemTableInfo.getMemTableId());
-    assertNull(oldestMemTableInfo.getMemTable());
-    assertTrue(oldestMemTableInfo.isPinned());
+    MemTableInfo oldestMemTableInfo = 
checkpointManager.getOldestUnpinnedMemTableInfo();
+    assertNull(oldestMemTableInfo);
     // unpin 2
     handler.unpinMemTable();
-    assertNull(checkpointManager.getOldestMemTableInfo());
+    assertNull(checkpointManager.getOldestUnpinnedMemTableInfo());
   }
 
   @Test
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
new file mode 100644
index 00000000000..c528965b58f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.wal.node;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALInsertNodeCache;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+public class WalDeleteOutdatedNewTest {
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final String identifier1 = String.valueOf(Integer.MAX_VALUE);
+  private static final String logDirectory1 = 
TestConstant.BASE_OUTPUT_PATH.concat("1/2910/");
+  private static final String databasePath = "root.test_sg";
+  private static final String devicePath = databasePath + ".test_d";
+  private static final String dataRegionId = "1";
+  private WALMode prevMode;
+  private boolean prevIsClusterMode;
+  private WALNode walNode1;
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.cleanDir(logDirectory1);
+    prevMode = config.getWalMode();
+    prevIsClusterMode = config.isClusterMode();
+    config.setWalMode(WALMode.SYNC);
+    config.setClusterMode(true);
+    walNode1 = new WALNode(identifier1, logDirectory1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    walNode1.close();
+    config.setWalMode(prevMode);
+    config.setClusterMode(prevIsClusterMode);
+    EnvironmentUtils.cleanDir(logDirectory1);
+
+    WALInsertNodeCache.getInstance(1).clear();
+  }
+
+  /**
+   * The simulation here is to write the last file, because serialization and 
disk flushing
+   * operations are asynchronous, so you have to wait until all the entries 
are processed to get the
+   * correct result, when WalEntry is not consumed to get memTableIdsOfWal, 
the result is not
+   * accurate, so when the actual deletion of expired wal files, Don't read 
the last wal file.
+   */
+  @Test
+  public void test01() throws IllegalPathException {
+    IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+
+    IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 4));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 5));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 6));
+
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+    Map<Long, Set<Long>> memTableIdsOfWal = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(1, memTableIdsOfWal.size());
+    Assert.assertEquals(2, memTableIdsOfWal.get(0L).size());
+    Assert.assertEquals(1, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+    walNode1.close();
+  }
+
+  /**
+   * Ensure that the memtableIds maintained by each wal file are accurate:<br>
+   * 1. _0-1-1.wal:memTable0、memTable1 <br>
+   * 2. roll wal file <br>
+   * 3. _1-6-1.wal: memTable1 <br>
+   * 4. wait until all walEntry consumed
+   */
+  @Test
+  public void test02() throws IllegalPathException {
+    IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+
+    IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 4));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 5));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 6));
+
+    walNode1.rollWALFile();
+    walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 7));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 8));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 9));
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+    Map<Long, Set<Long>> memTableIdsOfWal = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(2, memTableIdsOfWal.size());
+    Assert.assertEquals(1, memTableIdsOfWal.get(1L).size());
+    Assert.assertEquals(2, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+  }
+
+  /**
+   * Ensure that files that can be cleaned can be deleted: <br>
+   * 1. _0-0-1.wal: memTable0 、 memTable1 <br>
+   * 2. roll wal file <br>
+   * 3. _1-1-1.wal: memTable1 <br>
+   * 4. wait until all walEntry consumed <br>
+   * 5. memTable0 flush, memTable1 flush <br>
+   * 6. delete outdated wal files
+   */
+  @Test
+  public void test03() throws IllegalPathException {
+
+    IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+
+    IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+    walNode1.rollWALFile();
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+
+    Map<Long, Set<Long>> memTableIdsOfWal = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    walNode1.onMemTableFlushed(memTable0);
+    walNode1.onMemTableFlushed(memTable1);
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+    // before deleted
+    Assert.assertEquals(2, memTableIdsOfWal.size());
+    Assert.assertEquals(2, memTableIdsOfWal.get(0L).size());
+    File[] files = WALFileUtils.listAllWALFiles(new File(logDirectory1));
+    Assert.assertEquals(2, files.length);
+
+    walNode1.deleteOutdatedFiles();
+    Map<Long, Set<Long>> memTableIdsOfWalAfter = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+
+    // after deleted
+    Assert.assertEquals(0, memTableIdsOfWalAfter.size());
+    File[] filesAfter = WALFileUtils.listAllWALFiles(new File(logDirectory1));
+    Assert.assertEquals(1, filesAfter.length);
+  }
+
+  /**
+   * Ensure that files that can be cleaned can be deleted: <br>
+   * 1. _0-0-1.wal: memTable0 <br>
+   * 2. roll wal file <br>
+   * 3. _1-1-0.wal: memTable1 <br>
+   * 4. roll wal file <br>
+   * 5. _2-1-1.wal: memTable1 <br>
+   * 6. wait until all walEntry consumed <br>
+   * 7. memTable0 flush, memTable1 flush, memTable2 flush <br>
+   * 6. delete outdated wal files
+   */
+  @Test
+  public void test04() throws IllegalPathException {
+    IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+    walNode1.rollWALFile();
+
+    IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.rollWALFile();
+
+    IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+    walNode1.onMemTableFlushed(memTable2);
+    walNode1.onMemTableFlushed(memTable0);
+    walNode1.onMemTableFlushed(memTable1);
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+    Map<Long, Set<Long>> memTableIdsOfWal = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(3, memTableIdsOfWal.size());
+    Assert.assertEquals(3, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+
+    walNode1.deleteOutdatedFiles();
+    Map<Long, Set<Long>> memTableIdsOfWalAfter = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(0, memTableIdsOfWalAfter.size());
+    Assert.assertEquals(1, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+  }
+
+  /**
+   * Ensure that wal pinned to memtable cannot be deleted: <br>
+   * 1. _0-0-1.wal: memTable0 <br>
+   * 2. pin memTable0 <br>
+   * 3. memTable0 flush <br>
+   * 4. roll wal file <br>
+   * 5. _1-1-1.wal: memTable0、memTable1 <br>
+   * 6. roll wal file <br>
+   * 7. _2-1-1.wal: memTable1 <br>
+   * 8. roll wal file <br>
+   * 9. _2-1-1.wal: memTable1 <br>
+   * 10. wait until all walEntry consumed <br>
+   * 11. memTable0 flush, memTable1 flush <br>
+   * 12. delete outdated wal files
+   */
+  @Test
+  public void test05() throws IllegalPathException, MemTablePinException {
+    IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+    WALFlushListener listener =
+        walNode1.log(
+            memTable0.getMemTableId(),
+            generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+    walNode1.rollWALFile();
+
+    // pin memTable
+    WALEntryHandler handler = listener.getWalEntryHandler();
+    handler.pinMemTable();
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+    IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+    walNode1.rollWALFile();
+
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 4));
+    walNode1.rollWALFile();
+
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 5));
+    walNode1.onMemTableFlushed(memTable0);
+    walNode1.onMemTableFlushed(memTable1);
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+    Map<Long, Set<Long>> memTableIdsOfWal = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(4, memTableIdsOfWal.size());
+    Assert.assertEquals(4, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+
+    walNode1.deleteOutdatedFiles();
+    Map<Long, Set<Long>> memTableIdsOfWalAfter = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(3, memTableIdsOfWalAfter.size());
+    Assert.assertEquals(3, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+  }
+
+  /**
+   * Ensure that the flushed wal related to memtable cannot be deleted: <br>
+   * 1. _0-0-1.wal: memTable0 <br>
+   * 2. roll wal file <br>
+   * 3. _1-1-1.wal: memTable0 <br>
+   * 4. roll wal file <br>
+   * 5. _2-1-1.wal: memTable0 <br>
+   * 6. roll wal file <br>
+   * 7. _2-1-1.wal: memTable0 <br>
+   * 8. wait until all walEntry consumed <br>
+   * 9. delete outdated wal files
+   */
+  @Test
+  public void test06() throws IllegalPathException {
+    IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+    walNode1.rollWALFile();
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+    walNode1.rollWALFile();
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+    walNode1.rollWALFile();
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 4));
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+    Map<Long, Set<Long>> memTableIdsOfWal = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(4, memTableIdsOfWal.size());
+    Assert.assertEquals(4, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+
+    walNode1.deleteOutdatedFiles();
+    Map<Long, Set<Long>> memTableIdsOfWalAfter = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(4, memTableIdsOfWalAfter.size());
+    Assert.assertEquals(4, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+  }
+
+  /**
+   * Ensure that files that can be cleaned can be deleted: <br>
+   * 1. _0-0-1.wal: memTable0 <br>
+   * 2. roll wal file <br>
+   * 3. _1-1-0.wal: memTable1、memTable2 <br>
+   * 4. roll wal file <br>
+   * 5. _2-1-0.wal: memTable2 <br>
+   * 6. roll wal file <br>
+   * 7. _3-1-0.wal: memTable3 <br>
+   * 8. roll wal file <br>
+   * 9. _4-1-0.wal: memTable3 <br>
+   * 10. wait until all walEntry consumed <br>
+   * 11. memTable1 flush, memTable2 flush, memTable3 flush <br>
+   * 12. delete outdated wal files
+   */
+  @Test
+  public void test07() throws IllegalPathException {
+    IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+    walNode1.rollWALFile();
+
+    IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+
+    IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.rollWALFile();
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.rollWALFile();
+    IMemTable memTable3 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable3, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable3.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.rollWALFile();
+    walNode1.log(
+        memTable3.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.onMemTableFlushed(memTable1);
+    walNode1.onMemTableFlushed(memTable2);
+    walNode1.onMemTableFlushed(memTable3);
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+    Map<Long, Set<Long>> memTableIdsOfWal = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(5, memTableIdsOfWal.size());
+    Assert.assertEquals(5, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+
+    walNode1.deleteOutdatedFiles();
+    Map<Long, Set<Long>> memTableIdsOfWalAfter = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(2, memTableIdsOfWalAfter.size());
+    Assert.assertEquals(2, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+
+    walNode1.onMemTableFlushed(memTable0);
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+    walNode1.deleteOutdatedFiles();
+    Map<Long, Set<Long>> memTableIdsOfWalAfterAfter = 
walNode1.getWALBuffer().getMemTableIdsOfWal();
+    Assert.assertEquals(0, memTableIdsOfWalAfterAfter.size());
+    Assert.assertEquals(1, WALFileUtils.listAllWALFiles(new 
File(logDirectory1)).length);
+  }
+
+  /**
+   * Ensure that files that can be cleaned can be deleted: <br>
+   * 1. _0-0-1.wal: memTable0 <br>
+   * 2. roll wal file <br>
+   * 3. _1-1-0.wal: memTable1<br>
+   * 4. memTable1 flush <br>
+   * 5. roll wal file <br>
+   * 6. _2-1-0.wal: memTable2 <br>
+   * 7. wait until all walEntry consumed <br>
+   * 8. delete outdated wal files
+   */
+  @Test
+  public void test08() throws IllegalPathException {
+    IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+    walNode1.rollWALFile();
+
+    ConsensusReqReader.ReqIterator itr1 = walNode1.getReqIterator(1);
+    Assert.assertFalse(itr1.hasNext());
+
+    IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable1.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+    walNode1.onMemTableFlushed(memTable1);
+    walNode1.rollWALFile();
+
+    ConsensusReqReader.ReqIterator itr2 = walNode1.getReqIterator(1);
+    Assert.assertTrue(itr2.hasNext());
+
+    IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+    ConsensusReqReader.ReqIterator itr3 = walNode1.getReqIterator(1);
+    Assert.assertTrue(itr3.hasNext());
+    walNode1.deleteOutdatedFiles();
+
+    ConsensusReqReader.ReqIterator itr4 = walNode1.getReqIterator(1);
+    Assert.assertFalse(itr4.hasNext());
+    walNode1.rollWALFile();
+    Assert.assertTrue(itr4.hasNext());
+  }
+
+  /**
+   * Ensure that files that can be cleaned can be deleted: <br>
+   * 1. _0-0-1.wal: memTable0 <br>
+   * 2. roll wal file <br>
+   * 3. _2-1-0.wal: memTable2 <br>
+   * 4. wait until all walEntry consumed <br>
+   * 5. delete outdated wal files
+   */
+  @Test
+  public void test09() throws IllegalPathException {
+    IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable0.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+    walNode1.rollWALFile();
+
+    IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+    walNode1.log(
+        memTable2.getMemTableId(),
+        generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+    Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+    ConsensusReqReader.ReqIterator itr3 = walNode1.getReqIterator(1);
+    Assert.assertFalse(itr3.hasNext());
+  }
+
+  public static InsertRowNode generateInsertRowNode(String devicePath, long 
time, long searchIndex)
+      throws IllegalPathException {
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+
+    Object[] columns = new Object[6];
+    columns[0] = 1.0d;
+    columns[1] = 2f;
+    columns[2] = 10000L;
+    columns[3] = 100;
+    columns[4] = false;
+    columns[5] = new Binary("hh" + 0, TSFileConfig.STRING_CHARSET);
+
+    InsertRowNode node =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(devicePath),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+            dataTypes,
+            time,
+            columns,
+            false);
+    MeasurementSchema[] schemas = new MeasurementSchema[6];
+    for (int i = 0; i < 6; i++) {
+      schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
+    }
+    node.setMeasurementSchemas(schemas);
+    node.setSearchIndex(searchIndex);
+    return node;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
index 234dec46c26..368cc967ea2 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
@@ -48,6 +48,7 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
+import java.util.HashSet;
 
 public class WALRecoverWriterTest {
   private final File logFile =
@@ -65,7 +66,7 @@ public class WALRecoverWriterTest {
     // prepare file
     logFile.createNewFile();
     long firstSearchIndex = 
WALFileUtils.parseStartSearchIndex(logFile.getName());
-    WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new 
ArrayList<>());
+    WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new 
ArrayList<>(), new HashSet<>());
     // recover
     WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
     walRecoverWriter.recover(walMetaData);
@@ -87,7 +88,7 @@ public class WALRecoverWriterTest {
       stream.write(1);
     }
     long firstSearchIndex = 
WALFileUtils.parseStartSearchIndex(logFile.getName());
-    WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new 
ArrayList<>());
+    WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new 
ArrayList<>(), new HashSet<>());
     // recover
     WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
     walRecoverWriter.recover(walMetaData);
@@ -109,7 +110,7 @@ public class WALRecoverWriterTest {
     int size = walEntry.serializedSize();
     WALByteBufferForTest buffer = new 
WALByteBufferForTest(ByteBuffer.allocate(size));
     walEntry.serialize(buffer);
-    walMetaData.add(size, 1);
+    walMetaData.add(size, 1, walEntry.getMemTableId());
     try (WALWriter walWriter = new WALWriter(logFile)) {
       walWriter.write(buffer.getBuffer(), walMetaData);
     }
@@ -133,7 +134,7 @@ public class WALRecoverWriterTest {
     int size = walEntry.serializedSize();
     WALByteBufferForTest buffer = new 
WALByteBufferForTest(ByteBuffer.allocate(size));
     walEntry.serialize(buffer);
-    walMetaData.add(size, 1);
+    walMetaData.add(size, 1, walEntry.getMemTableId());
     try (WALWriter walWriter = new WALWriter(logFile)) {
       walWriter.write(buffer.getBuffer(), walMetaData);
     }
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 162af1852a0..43d431a7a2d 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -488,31 +488,31 @@ data_replication_factor=1
 # Datatype: boolean
 # enable_timed_flush_seq_memtable=true
 
-# If a memTable's created time is older than current time minus this, the 
memtable will be flushed to disk.
+# If a memTable's last update time is older than current time minus this, the 
memtable will be flushed to disk.
 # Only check sequence tsfiles' memtables.
-# The default flush interval is 3 * 60 * 60 * 1000. (unit: ms)
+# The default flush interval is 10 * 60 * 1000. (unit: ms)
 # Datatype: long
-# seq_memtable_flush_interval_in_ms=10800000
+# seq_memtable_flush_interval_in_ms=600000
 
 # The interval to check whether sequence memtables need flushing.
-# The default flush check interval is 10 * 60 * 1000. (unit: ms)
+# The default flush check interval is 30 * 1000. (unit: ms)
 # Datatype: long
-# seq_memtable_flush_check_interval_in_ms=600000
+# seq_memtable_flush_check_interval_in_ms=30000
 
 # Whether to timed flush unsequence tsfiles' memtables.
 # Datatype: boolean
 # enable_timed_flush_unseq_memtable=true
 
-# If a memTable's created time is older than current time minus this, the 
memtable will be flushed to disk.
+# If a memTable's last update time is older than current time minus this, the 
memtable will be flushed to disk.
 # Only check unsequence tsfiles' memtables.
-# The default flush interval is 3 * 60 * 60 * 1000. (unit: ms)
+# The default flush interval is 10 * 60 * 1000. (unit: ms)
 # Datatype: long
-# unseq_memtable_flush_interval_in_ms=10800000
+# unseq_memtable_flush_interval_in_ms=600000
 
 # The interval to check whether unsequence memtables need flushing.
-# The default flush check interval is 10 * 60 * 1000. (unit: ms)
+# The default flush check interval is 30 * 1000. (unit: ms)
 # Datatype: long
-# unseq_memtable_flush_check_interval_in_ms=600000
+# unseq_memtable_flush_check_interval_in_ms=30000
 
 # The sort algorithms used in the memtable's TVList
 # TIM: default tim sort,

Reply via email to