This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d18f6b91c3d Fix IoTConsensus sync stuck problem  (#12856)
d18f6b91c3d is described below

commit d18f6b91c3d564e63c06b6ffd0d80379b36804d8
Author: Li Yu Heng <[email protected]>
AuthorDate: Wed Jul 10 01:06:21 2024 +0800

    Fix IoTConsensus sync stuck problem  (#12856)
---
 .../dataregion/DataRegionStateMachine.java         |  2 +-
 .../planner/plan/node/write/DeleteDataNode.java    | 18 +------
 .../plan/planner/plan/node/write/InsertNode.java   | 22 +-------
 .../plan/planner/plan/node/write/SearchNode.java   | 51 ++++++++++++++++++
 .../db/storageengine/dataregion/DataRegion.java    | 60 ++++++++++++----------
 .../dataregion/memtable/TsFileProcessor.java       |  2 +
 .../storageengine/dataregion/wal/node/WALNode.java | 16 ++++++
 .../dataregion/wal/recover/WALNodeRecoverTask.java | 45 +++++++---------
 ...{WALRecoverWriter.java => WALRepairWriter.java} |  8 +--
 .../compaction/AbstractCompactionTest.java         |  3 ++
 ...eCrossSpaceCompactionWithFastPerformerTest.java |  5 ++
 ...verWriterTest.java => WALRepairWriterTest.java} | 27 ++++------
 12 files changed, 147 insertions(+), 112 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 0f949fd4904..f60ae0bf9a4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -289,7 +289,7 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
   @Override
   public DataSet read(IConsensusRequest request) {
     if (request instanceof GetConsensusReqReaderPlan) {
-      return region.getWALNode();
+      return 
region.getWALNode().orElseThrow(UnsupportedOperationException::new);
     } else {
       FragmentInstance fragmentInstance;
       try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 93eb2a873ab..54f20728de2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -53,9 +53,8 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode.NO_CONSENSUS_INDEX;
 
-public class DeleteDataNode extends WritePlanNode implements WALEntryValue {
+public class DeleteDataNode extends SearchNode implements WALEntryValue {
   /** byte: type, integer: pathList.size(), long: deleteStartTime, 
deleteEndTime, searchIndex */
   private static final int FIXED_SERIALIZED_SIZE = Short.BYTES + Integer.BYTES 
+ Long.BYTES * 3;
 
@@ -63,12 +62,6 @@ public class DeleteDataNode extends WritePlanNode implements 
WALEntryValue {
   private final long deleteStartTime;
   private final long deleteEndTime;
 
-  /**
-   * this index is used by wal search, its order should be protected by the 
upper layer, and the
-   * value should start from 1
-   */
-  protected long searchIndex = NO_CONSENSUS_INDEX;
-
   private TRegionReplicaSet regionReplicaSet;
 
   public DeleteDataNode(
@@ -104,15 +97,6 @@ public class DeleteDataNode extends WritePlanNode 
implements WALEntryValue {
     return deleteEndTime;
   }
 
-  public long getSearchIndex() {
-    return searchIndex;
-  }
-
-  /** Search index should start from 1 */
-  public void setSearchIndex(long searchIndex) {
-    this.searchIndex = searchIndex;
-  }
-
   @Override
   public List<PlanNode> getChildren() {
     return new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index a572f689e8c..161b6ac9d2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -25,11 +25,9 @@ import 
org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -46,10 +44,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
-public abstract class InsertNode extends WritePlanNode implements 
ComparableConsensusRequest {
-
-  /** this insert node doesn't need to participate in iot consensus */
-  public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
+public abstract class InsertNode extends SearchNode implements 
ComparableConsensusRequest {
 
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
@@ -72,12 +67,6 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
    */
   protected IDeviceID deviceID;
 
-  /**
-   * this index is used by wal search, its order should be protected by the 
upper layer, and the
-   * value should start from 1
-   */
-  protected long searchIndex = NO_CONSENSUS_INDEX;
-
   protected boolean isGeneratedByRemoteConsensusLeader = false;
 
   /** Physical address of data region after splitting */
@@ -167,15 +156,6 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     this.deviceID = deviceID;
   }
 
-  public long getSearchIndex() {
-    return searchIndex;
-  }
-
-  /** Search index should start from 1 */
-  public void setSearchIndex(long searchIndex) {
-    this.searchIndex = searchIndex;
-  }
-
   public boolean isGeneratedByRemoteConsensusLeader() {
     switch (config.getDataRegionConsensusProtocolClass()) {
       case ConsensusFactory.IOT_CONSENSUS:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
new file mode 100644
index 00000000000..c5172534298
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode.NO_CONSENSUS_INDEX;
+
+public abstract class SearchNode extends WritePlanNode {
+
+  /** this insert node doesn't need to participate in iot consensus */
+  public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
+
+  /**
+   * this index is used by wal search, its order should be protected by the 
upper layer, and the
+   * value should start from 1
+   */
+  protected long searchIndex = NO_CONSENSUS_INDEX;
+
+  protected SearchNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public long getSearchIndex() {
+    return searchIndex;
+  }
+
+  /** Search index should start from 1 */
+  public void setSearchIndex(long searchIndex) {
+    this.searchIndex = searchIndex;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 7f49bf24818..ed008bb47d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -944,6 +944,8 @@ public class DataRegion implements IDataRegionForQuery {
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
     try {
       if (deleted) {
+        logger.info(
+            "Won't insert tablet {}, because region is deleted", 
insertTabletNode.getSearchIndex());
         return;
       }
       TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
@@ -1079,6 +1081,12 @@ public class DataRegion implements IDataRegionForQuery {
       long timePartitionId) {
     // return when start >= end or all measurement failed
     if (start >= end || insertTabletNode.allMeasurementFailed()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Won't insert tablet {}, because {}",
+            insertTabletNode.getSearchIndex(),
+            start >= end ? "start >= end" : "insertTabletNode 
allMeasurementFailed");
+      }
       return true;
     }
 
@@ -2018,31 +2026,17 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
-  private void separateTsFile(
+  private void getTwoKindsOfTsFiles(
       List<TsFileResource> sealedResource,
       List<TsFileResource> unsealedResource,
       long startTime,
       long endTime) {
-    tsFileManager
-        .getTsFileList(true, startTime, endTime)
-        .forEach(
-            tsFileResource -> {
-              if (tsFileResource.isClosed()) {
-                sealedResource.add(tsFileResource);
-              } else {
-                unsealedResource.add(tsFileResource);
-              }
-            });
-    tsFileManager
-        .getTsFileList(false, startTime, endTime)
-        .forEach(
-            tsFileResource -> {
-              if (tsFileResource.isClosed()) {
-                sealedResource.add(tsFileResource);
-              } else {
-                unsealedResource.add(tsFileResource);
-              }
-            });
+    List<TsFileResource> tsFileResources = tsFileManager.getTsFileList(true, 
startTime, endTime);
+    tsFileResources.addAll(tsFileManager.getTsFileList(false, startTime, 
endTime));
+    
tsFileResources.stream().filter(TsFileResource::isClosed).forEach(sealedResource::add);
+    tsFileResources.stream()
+        .filter(resource -> !resource.isClosed())
+        .forEach(unsealedResource::add);
   }
 
   /**
@@ -2083,7 +2077,7 @@ public class DataRegion implements IDataRegionForQuery {
 
       List<TsFileResource> sealedTsFileResource = new ArrayList<>();
       List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
-      separateTsFile(sealedTsFileResource, unsealedTsFileResource, startTime, 
endTime);
+      getTwoKindsOfTsFiles(sealedTsFileResource, unsealedTsFileResource, 
startTime, endTime);
       // deviceMatchInfo is used for filter the matched deviceId in 
TsFileResource
       // deviceMatchInfo contains the DeviceId means this device matched the 
pattern
       Set<String> deviceMatchInfo = new HashSet<>();
@@ -2126,7 +2120,7 @@ public class DataRegion implements IDataRegionForQuery {
       }
       List<TsFileResource> sealedTsFileResource = new ArrayList<>();
       List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
-      separateTsFile(sealedTsFileResource, unsealedTsFileResource, startTime, 
endTime);
+      getTwoKindsOfTsFiles(sealedTsFileResource, unsealedTsFileResource, 
startTime, endTime);
       deleteDataDirectlyInFile(unsealedTsFileResource, pathToDelete, 
startTime, endTime);
       writeUnlock();
       releasedLock = true;
@@ -2161,6 +2155,17 @@ public class DataRegion implements IDataRegionForQuery {
         walFlushListeners.add(walFlushListener);
       }
     }
+    // Some time the deletion operation doesn't have any related tsfile 
processor or memtable,
+    // but it's still necessary to write to the WAL, so that iotconsensus can 
synchronize the delete
+    // operation to other nodes.
+    if (walFlushListeners.isEmpty()) {
+      // TODO: IoTConsensusV2 deletion support
+      getWALNode()
+          .ifPresent(
+              walNode ->
+                  walFlushListeners.add(
+                      walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST, 
deleteDataNode)));
+    }
     return walFlushListeners;
   }
 
@@ -3586,13 +3591,14 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /** This method could only be used in iot consensus */
-  public IWALNode getWALNode() {
+  public Optional<IWALNode> getWALNode() {
     if 
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
 {
-      throw new UnsupportedOperationException();
+      return Optional.empty();
     }
     // identifier should be same with getTsFileProcessor method
-    return WALManager.getInstance()
-        .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
+    return Optional.of(
+        WALManager.getInstance()
+            .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + 
dataRegionId));
   }
 
   /** Wait for this data region successfully deleted */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 02fdf59563c..78eb2309cf6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -201,6 +201,8 @@ public class TsFileProcessor {
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
 
+  public static final int MEMTABLE_NOT_EXIST = -1;
+
   @SuppressWarnings("squid:S107")
   public TsFileProcessor(
       String storageGroupName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index ac938a33f72..79d5d26673b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -128,12 +128,20 @@ public class WALNode implements IWALNode {
 
   @Override
   public WALFlushListener log(long memTableId, InsertRowNode insertRowNode) {
+    logger.debug(
+        "WAL node-{} logs insertRowNode, the search index is {}.",
+        identifier,
+        insertRowNode.getSearchIndex());
     WALEntry walEntry = new WALInfoEntry(memTableId, insertRowNode);
     return log(walEntry);
   }
 
   @Override
   public WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode) {
+    logger.debug(
+        "WAL node-{} logs insertRowsNode, the search index is {}.",
+        identifier,
+        insertRowsNode.getSearchIndex());
     WALEntry walEntry = new WALInfoEntry(memTableId, insertRowsNode);
     return log(walEntry);
   }
@@ -141,12 +149,20 @@ public class WALNode implements IWALNode {
   @Override
   public WALFlushListener log(
       long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
+    logger.debug(
+        "WAL node-{} logs insertTabletNode, the search index is {}.",
+        identifier,
+        insertTabletNode.getSearchIndex());
     WALEntry walEntry = new WALInfoEntry(memTableId, insertTabletNode, start, 
end);
     return log(walEntry);
   }
 
   @Override
   public WALFlushListener log(long memTableId, DeleteDataNode deleteDataNode) {
+    logger.debug(
+        "WAL node-{} logs deleteDataNode, the search index is {}.",
+        identifier,
+        deleteDataNode.getSearchIndex());
     WALEntry walEntry = new WALInfoEntry(memTableId, deleteDataNode);
     return log(walEntry);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
index 1419f9c6136..43140cb56cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
@@ -24,12 +24,10 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
-import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.MemTableInfo;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
@@ -83,7 +81,7 @@ public class WALNodeRecoverTask implements Runnable {
     logger.info("Start recovering WAL node in the directory {}", logDirectory);
 
     // recover version id and search index
-    long[] indexInfo = recoverLastFile();
+    long[] indexInfo = readLastFileInfoAndRepairIt();
     long lastVersionId = indexInfo[0];
     long lastSearchIndex = indexInfo[1];
 
@@ -156,7 +154,7 @@ public class WALNodeRecoverTask implements Runnable {
     }
   }
 
-  private long[] recoverLastFile() {
+  private long[] readLastFileInfoAndRepairIt() {
     File[] walFiles = WALFileUtils.listAllWALFiles(logDirectory);
     if (walFiles == null || walFiles.length == 0) {
       return new long[] {0L, 0L};
@@ -173,20 +171,11 @@ public class WALNodeRecoverTask implements Runnable {
         WALEntry walEntry = walReader.next();
         long searchIndex = DEFAULT_SEARCH_INDEX;
         if (walEntry.getType().needSearch()) {
-          if (walEntry.getType() != WALEntryType.DELETE_DATA_NODE) {
-            InsertNode insertNode = (InsertNode) walEntry.getValue();
-            if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
-              searchIndex = insertNode.getSearchIndex();
-              lastSearchIndex = Math.max(lastSearchIndex, 
insertNode.getSearchIndex());
-              fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
-            }
-          } else {
-            DeleteDataNode deleteNode = (DeleteDataNode) walEntry.getValue();
-            if (deleteNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
-              searchIndex = deleteNode.getSearchIndex();
-              lastSearchIndex = Math.max(lastSearchIndex, 
deleteNode.getSearchIndex());
-              fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
-            }
+          SearchNode searchNode = (SearchNode) walEntry.getValue();
+          if (searchNode.getSearchIndex() != SearchNode.NO_CONSENSUS_INDEX) {
+            searchIndex = searchNode.getSearchIndex();
+            lastSearchIndex = Math.max(lastSearchIndex, 
searchNode.getSearchIndex());
+            fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
           }
         }
         metaData.setTruncateOffSet(walReader.getWALCurrentReadOffset());
@@ -196,12 +185,7 @@ public class WALNodeRecoverTask implements Runnable {
       logger.warn("Fail to read wal logs from {}, skip them", lastWALFile, e);
     }
     // make sure last wal file is correct
-    WALRecoverWriter walRecoverWriter = new WALRecoverWriter(lastWALFile);
-    try {
-      walRecoverWriter.recover(metaData);
-    } catch (IOException e) {
-      logger.error("Fail to recover metadata of wal file {}", lastWALFile);
-    }
+    repairWalFileIfBroken(lastWALFile, metaData);
     // rename last wal file when file status are inconsistent
     if (WALFileUtils.parseStatusCode(lastWALFile.getName()) != fileStatus) {
       String targetName =
@@ -216,12 +200,20 @@ public class WALNodeRecoverTask implements Runnable {
     return new long[] {lastVersionId, lastSearchIndex};
   }
 
+  private static void repairWalFileIfBroken(File walFile, WALMetaData 
metaData) {
+    WALRepairWriter walRepairWriter = new WALRepairWriter(walFile);
+    try {
+      walRepairWriter.repair(metaData);
+    } catch (IOException e) {
+      logger.error("Fail to recover metadata of wal file {}", walFile, e);
+    }
+  }
+
   private void recoverInfoFromCheckpoints() {
     // parse memTables information
     CheckpointRecoverUtils.CheckpointInfo info =
         CheckpointRecoverUtils.recoverMemTableInfo(logDirectory);
     memTableId2Info = info.getMemTableId2Info();
-    memTableId2RecoverPerformer = new HashMap<>();
     // update init memTable id
     long maxMemTableId = info.getMaxMemTableId();
     AtomicLong memTableIdCounter = AbstractMemTable.memTableIdCounter;
@@ -232,6 +224,7 @@ public class WALNodeRecoverTask implements Runnable {
       }
     }
     // update firstValidVersionId and get recover performer from 
WALRecoverManager
+    memTableId2RecoverPerformer = new HashMap<>();
     for (MemTableInfo memTableInfo : memTableId2Info.values()) {
       firstValidVersionId = Math.min(firstValidVersionId, 
memTableInfo.getFirstFileVersionId());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
similarity index 93%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
index 900a706006b..c60c4d1a809 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
@@ -35,15 +35,15 @@ import static 
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGI
 import static 
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2;
 import static 
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2_BYTES;
 
-/** Check whether the wal file is broken and recover it. */
-public class WALRecoverWriter {
+/** Check whether the wal file is broken and repair it. */
+public class WALRepairWriter {
   private final File logFile;
 
-  public WALRecoverWriter(File logFile) {
+  public WALRepairWriter(File logFile) {
     this.logFile = logFile;
   }
 
-  public void recover(WALMetaData metaData) throws IOException {
+  public void repair(WALMetaData metaData) throws IOException {
     // locate broken data
     long truncateSize;
     WALFileVersion version = WALFileVersion.getVersion(logFile);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
index 31fae84ad9a..8bd47bbd917 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManag
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.tools.validate.TsFileValidationTool;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -471,6 +472,8 @@ public class AbstractCompactionTest {
     ChunkCache.getInstance().clear();
     TimeSeriesMetadataCache.getInstance().clear();
     BloomFilterCache.getInstance().clear();
+    WALManager.getInstance().clear();
+
     EnvironmentUtils.cleanAllDir();
 
     if (SEQ_DIRS.exists()) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
index d9d92034889..c2920db6050 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
@@ -55,6 +55,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -70,6 +72,9 @@ import static org.junit.Assert.assertEquals;
 
 public class RewriteCrossSpaceCompactionWithFastPerformerTest extends 
AbstractCompactionTest {
 
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(RewriteCrossSpaceCompactionWithFastPerformerTest.class);
+
   private final String oldThreadName = Thread.currentThread().getName();
 
   @Before
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
similarity index 94%
rename from 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
rename to 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
index 7d49c448354..aca54fe2f19 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
@@ -51,7 +51,7 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.HashSet;
 
-public class WALRecoverWriterTest {
+public class WALRepairWriterTest {
   private final File logFile =
       new File(
           TestConstant.BASE_OUTPUT_PATH.concat(
@@ -68,9 +68,8 @@ public class WALRecoverWriterTest {
     logFile.createNewFile();
     long firstSearchIndex = 
WALFileUtils.parseStartSearchIndex(logFile.getName());
     WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new 
ArrayList<>(), new HashSet<>());
-    // recover
-    WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
-    walRecoverWriter.recover(walMetaData);
+    // repair
+    new WALRepairWriter(logFile).repair(walMetaData);
     // verify file, marker + metadata(search index + size number) + metadata 
size + head magic
     // string + tail magic string
     Assert.assertEquals(
@@ -94,9 +93,8 @@ public class WALRecoverWriterTest {
     }
     long firstSearchIndex = 
WALFileUtils.parseStartSearchIndex(logFile.getName());
     WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new 
ArrayList<>(), new HashSet<>());
-    // recover
-    WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
-    walRecoverWriter.recover(walMetaData);
+    // repair
+    new WALRepairWriter(logFile).repair(walMetaData);
     // verify file, marker + metadata(search index + size number) + metadata 
size + magic string
     Assert.assertEquals(
         Byte.BYTES
@@ -122,9 +120,8 @@ public class WALRecoverWriterTest {
     try (WALWriter walWriter = new WALWriter(logFile)) {
       walWriter.write(buffer.getBuffer(), walMetaData);
     }
-    // recover
-    WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
-    walRecoverWriter.recover(walMetaData);
+    // repair
+    new WALRepairWriter(logFile).repair(walMetaData);
     // verify file
     try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
       Assert.assertTrue(reader.hasNext());
@@ -146,9 +143,8 @@ public class WALRecoverWriterTest {
     try (WALWriter walWriter = new WALWriter(logFile)) {
       walWriter.write(buffer.getBuffer(), walMetaData);
     }
-    // recover
-    WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
-    walRecoverWriter.recover(walMetaData);
+    // repair
+    new WALRepairWriter(logFile).repair(walMetaData);
     // verify file
     try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
       Assert.assertTrue(reader.hasNext());
@@ -175,9 +171,8 @@ public class WALRecoverWriterTest {
     try (FileChannel channel = FileChannel.open(logFile.toPath(), 
StandardOpenOption.APPEND)) {
       channel.truncate(len - 1);
     }
-    // recover
-    WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
-    walRecoverWriter.recover(walMetaData);
+    // repair
+    new WALRepairWriter(logFile).repair(walMetaData);
     // verify file
     try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
       Assert.assertTrue(reader.hasNext());

Reply via email to