This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bdebd1de25 [IOTDB-3684] Fail to read wal from wal file caused by 
FileNotFoundException (#6494)
bdebd1de25 is described below

commit bdebd1de255a70f13b28d64c72a13ae8c2f4adb6
Author: Alan Choo <[email protected]>
AuthorDate: Wed Jun 29 09:26:54 2022 +0800

    [IOTDB-3684] Fail to read wal from wal file caused by FileNotFoundException 
(#6494)
---
 .../iotdb/db/wal/buffer/AbstractWALBuffer.java     | 14 ++--
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  7 +-
 .../java/org/apache/iotdb/db/wal/io/WALWriter.java | 13 +++
 .../java/org/apache/iotdb/db/wal/node/WALNode.java | 92 ++++++++++++++++++++--
 .../apache/iotdb/db/wal/utils/WALFileUtils.java    | 22 +++---
 .../iotdb/db/wal/node/ConsensusReqReaderTest.java  | 28 +++----
 .../org/apache/iotdb/db/wal/node/WALNodeTest.java  |  6 +-
 7 files changed, 139 insertions(+), 43 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java 
b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 17df8b56e7..51d0ff33ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.wal.buffer;
 
 import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.db.wal.io.ILogWriter;
 import org.apache.iotdb.db.wal.io.WALWriter;
 import org.apache.iotdb.db.wal.utils.WALFileStatus;
 import org.apache.iotdb.db.wal.utils.WALFileUtils;
@@ -43,10 +42,8 @@ public abstract class AbstractWALBuffer implements 
IWALBuffer {
   protected final AtomicLong currentWALFileVersion = new AtomicLong();
   /** current search index */
   protected volatile long currentSearchIndex;
-  /** current search index */
-  protected volatile WALFileStatus currentFileStatus;
   /** current wal file log writer */
-  protected volatile ILogWriter currentWALFileWriter;
+  protected volatile WALWriter currentWALFileWriter;
 
   public AbstractWALBuffer(
       String identifier, String logDirectory, long startFileVersion, long 
startSearchIndex)
@@ -58,14 +55,15 @@ public abstract class AbstractWALBuffer implements 
IWALBuffer {
       logger.info("Create folder {} for wal node-{}'s buffer.", logDirectory, 
identifier);
     }
     currentSearchIndex = startSearchIndex;
-    currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
     currentWALFileVersion.set(startFileVersion);
     currentWALFileWriter =
         new WALWriter(
             SystemFileFactory.INSTANCE.getFile(
                 logDirectory,
                 WALFileUtils.getLogFileName(
-                    currentWALFileVersion.get(), currentSearchIndex, 
currentFileStatus)));
+                    currentWALFileVersion.get(),
+                    currentSearchIndex,
+                    WALFileStatus.CONTAINS_SEARCH_INDEX)));
   }
 
   @Override
@@ -99,7 +97,9 @@ public abstract class AbstractWALBuffer implements IWALBuffer 
{
         SystemFileFactory.INSTANCE.getFile(
             logDirectory,
             WALFileUtils.getLogFileName(
-                currentWALFileVersion.incrementAndGet(), searchIndex, 
currentFileStatus));
+                currentWALFileVersion.incrementAndGet(),
+                searchIndex,
+                WALFileStatus.CONTAINS_SEARCH_INDEX));
     currentWALFileWriter = new WALWriter(nextLogFile);
     logger.debug("Open new wal file {} for wal node-{}'s buffer.", 
nextLogFile, identifier);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java 
b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 6626ce7f91..e14ba4943d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -73,6 +73,8 @@ public class WALBuffer extends AbstractWALBuffer {
   // buffer in syncing status, serializeThread makes sure no more writes to 
syncingBuffer
   private volatile ByteBuffer syncingBuffer;
   // endregion
+  /** file status of working buffer, updating file writer's status when 
syncing */
+  protected volatile WALFileStatus currentFileStatus;
   /** single thread to serialize WALEntry to workingBuffer */
   private final ExecutorService serializeThread;
   /** single thread to sync syncingBuffer to disk */
@@ -86,6 +88,7 @@ public class WALBuffer extends AbstractWALBuffer {
       String identifier, String logDirectory, long startFileVersion, long 
startSearchIndex)
       throws FileNotFoundException {
     super(identifier, logDirectory, startFileVersion, startSearchIndex);
+    currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
     allocateBuffers();
     serializeThread =
         IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -402,6 +405,8 @@ public class WALBuffer extends AbstractWALBuffer {
 
     @Override
     public void run() {
+      currentWALFileWriter.updateFileStatus(fileStatus);
+
       // flush buffer to os
       try {
         currentWALFileWriter.write(syncingBuffer);
@@ -437,7 +442,7 @@ public class WALBuffer extends AbstractWALBuffer {
       if (rollWALFileWriterListener != null
           || (forceFlag && currentWALFileWriter.size() >= 
config.getWalFileSizeThresholdInByte())) {
         try {
-          rollLogWriter(searchIndex, fileStatus);
+          rollLogWriter(searchIndex, currentWALFileWriter.getWalFileStatus());
           if (rollWALFileWriterListener != null) {
             rollWALFileWriterListener.succeed();
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java 
b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
index 9c9e6174b3..5ce2bcb251 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
@@ -19,13 +19,26 @@
 package org.apache.iotdb.db.wal.io;
 
 import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.utils.WALFileStatus;
 
 import java.io.File;
 import java.io.FileNotFoundException;
 
 /** WALWriter writes the binary {@link WALEntry} into .wal file. */
 public class WALWriter extends LogWriter {
+  private WALFileStatus walFileStatus = 
WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
+
   public WALWriter(File logFile) throws FileNotFoundException {
     super(logFile);
   }
+
+  public void updateFileStatus(WALFileStatus walFileStatus) {
+    if (walFileStatus == WALFileStatus.CONTAINS_SEARCH_INDEX) {
+      this.walFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
+    }
+  }
+
+  public WALFileStatus getWalFileStatus() {
+    return walFileStatus;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java 
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 5d85114446..6827a97de2 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -541,6 +541,15 @@ public class WALNode implements IWALNode {
           break;
         }
       }
+      // cannot find any in this file
+      if (WALFileUtils.parseStatusCode(currentFiles[i].getName())
+          == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+        if (!tmpNodes.isEmpty()) {
+          return mergeInsertNodes(tmpNodes);
+        } else {
+          continue;
+        }
+      }
 
       try (WALReader walReader = new WALReader(currentFiles[i])) {
         while (walReader.hasNext()) {
@@ -557,6 +566,10 @@ public class WALNode implements IWALNode {
             return mergeInsertNodes(tmpNodes);
           }
         }
+      } catch (FileNotFoundException e) {
+        logger.debug(
+            "WAL file {} has been deleted, try to call getReq({}) again.", 
currentFiles[i], index);
+        return getReq(index);
       } catch (Exception e) {
         logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
       }
@@ -588,6 +601,15 @@ public class WALNode implements IWALNode {
           break;
         }
       }
+      // cannot find any in this file
+      if (WALFileUtils.parseStatusCode(currentFiles[i].getName())
+          == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+        if (!tmpNodes.isEmpty()) {
+          result.add(mergeInsertNodes(tmpNodes));
+        } else {
+          continue;
+        }
+      }
 
       try (WALReader walReader = new WALReader(currentFiles[i])) {
         while (walReader.hasNext()) {
@@ -618,6 +640,13 @@ public class WALNode implements IWALNode {
             tmpNodes = new ArrayList<>();
           }
         }
+      } catch (FileNotFoundException e) {
+        logger.debug(
+            "WAL file {} has been deleted, try to call getReqs({}, {}) again.",
+            currentFiles[i],
+            startIndex,
+            num);
+        return getReqs(startIndex, num);
       } catch (Exception e) {
         logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
       }
@@ -661,9 +690,11 @@ public class WALNode implements IWALNode {
         return true;
       }
 
+      // clear outdated iterator
       insertNodes.clear();
       itr = null;
 
+      // update files to search
       if (needUpdatingFilesToSearch || filesToSearch == null) {
         updateFilesToSearch();
         if (needUpdatingFilesToSearch) {
@@ -671,6 +702,16 @@ public class WALNode implements IWALNode {
         }
       }
 
+      // find file contains search index
+      while 
(WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
+          == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+        currentFileIndex++;
+        if (currentFileIndex >= filesToSearch.length) {
+          needUpdatingFilesToSearch = true;
+          return false;
+        }
+      }
+
       // find all insert plan of current wal file
       List<InsertNode> tmpNodes = new ArrayList<>();
       long targetIndex = nextSearchIndex;
@@ -697,6 +738,13 @@ public class WALNode implements IWALNode {
             tmpNodes = new ArrayList<>();
           }
         }
+      } catch (FileNotFoundException e) {
+        logger.debug(
+            "WAL file {} has been deleted, try to find next {} again.",
+            identifier,
+            nextSearchIndex);
+        reset();
+        hasNext();
       } catch (Exception e) {
         logger.error("Fail to read wal from wal file {}", 
filesToSearch[currentFileIndex], e);
       }
@@ -707,6 +755,14 @@ public class WALNode implements IWALNode {
       } else {
         int fileIndex = currentFileIndex + 1;
         while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length) {
+          // cannot find any in this file, find all slices of last insert plan
+          if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
+              == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+            insertNodes.add(mergeInsertNodes(tmpNodes));
+            tmpNodes = Collections.emptyList();
+            break;
+          }
+
           try (WALReader walReader = new WALReader(filesToSearch[fileIndex])) {
             while (walReader.hasNext()) {
               WALEntry walEntry = walReader.next();
@@ -726,6 +782,13 @@ public class WALNode implements IWALNode {
                 break;
               }
             }
+          } catch (FileNotFoundException e) {
+            logger.debug(
+                "WAL file {} has been deleted, try to find next {} again.",
+                identifier,
+                nextSearchIndex);
+            reset();
+            hasNext();
           } catch (Exception e) {
             logger.error("Fail to read wal from wal file {}", 
filesToSearch[currentFileIndex], e);
           }
@@ -764,14 +827,24 @@ public class WALNode implements IWALNode {
       }
 
       InsertNode insertNode = itr.next();
-      if (insertNode.getSearchIndex() != nextSearchIndex) {
+      if (insertNode.getSearchIndex() == nextSearchIndex) {
+        nextSearchIndex++;
+      } else if (insertNode.getSearchIndex() > nextSearchIndex) {
         logger.warn(
             "Search index of wal node-{} are not continuously, skip from {} to 
{}.",
             identifier,
             nextSearchIndex,
             insertNode.getSearchIndex());
+        skipTo(insertNode.getSearchIndex() + 1);
+      } else {
+        logger.error(
+            "Search index of wal node-{} are out of order, {} is before {}.",
+            identifier,
+            nextSearchIndex,
+            insertNode.getSearchIndex());
+        throw new RuntimeException(
+            String.format("Search index of wal node-%s are out of order", 
identifier));
       }
-      nextSearchIndex = insertNode.getSearchIndex() + 1;
 
       return insertNode;
     }
@@ -802,13 +875,18 @@ public class WALNode implements IWALNode {
             nextSearchIndex,
             targetIndex,
             targetIndex);
-        searchedFilesVersionId = -1;
-        insertNodes.clear();
-        itr = null;
       }
+      reset();
       nextSearchIndex = targetIndex;
-      this.filesToSearch = null;
-      this.currentFileIndex = -1;
+    }
+
+    /** Reset all params except nextSearchIndex */
+    private void reset() {
+      searchedFilesVersionId = -1;
+      insertNodes.clear();
+      itr = null;
+      filesToSearch = null;
+      currentFileIndex = -1;
       needUpdatingFilesToSearch = true;
     }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java 
b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
index 84874d09cb..e5d945024b 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
@@ -36,11 +36,11 @@ public class WALFileUtils {
    * versionId is a self-incremented id number, helping to maintain the order 
of wal files.
    * startSearchIndex is the valid search index of last flushed wal entry. 
statusCode is the. For
    * example: <br>
-   * _0-0-0.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
-   * _1-5-1.wal: -1, -1, -1, -1 <br>
-   * _2-5-0.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
-   * _3-12-0.wal: 12, 12, 12, 12, 12 <br>
-   * _4-12-0.wal: 12, 13, 14, 15, 16, -1 <br>
+   * _0-0-1.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
+   * _1-5-0.wal: -1, -1, -1, -1 <br>
+   * _2-5-1.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
+   * _3-12-1.wal: 12, 12, 12, 12, 12 <br>
+   * _4-12-1.wal: 12, 13, 14, 15, 16, -1 <br>
    */
   public static final Pattern WAL_FILE_NAME_PATTERN =
       Pattern.compile(
@@ -105,13 +105,13 @@ public class WALFileUtils {
 
   /**
    * Find index of the file which probably contains target insert plan. <br>
-   * Given wal files [ _0-0-0.wal, _1-5-1.wal, _2-5-0.wal, _3-12-0.wal, 
_4-12-0.wal ], details as
+   * Given wal files [ _0-0-1.wal, _1-5-0.wal, _2-5-1.wal, _3-12-1.wal, 
_4-12-1.wal ], details as
    * below: <br>
-   * _0-0-0.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
-   * _1-5-1.wal: -1, -1, -1, -1 <br>
-   * _2-5-0.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
-   * _3-12-0.wal: 12, 12, 12, 12, 12 <br>
-   * _4-12-0.wal: 12, 13, 14, 15, 16, -1 <br>
+   * _0-0-1.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
+   * _1-5-0.wal: -1, -1, -1, -1 <br>
+   * _2-5-1.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
+   * _3-12-1.wal: 12, 12, 12, 12, 12 <br>
+   * _4-12-1.wal: 12, 13, 14, 15, 16, -1 <br>
    * searching [1, 5] will return 0, searching [6, 12] will return 1, search 
[13, infinity) will
    * return 3, others will return -1
    *
diff --git 
a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java 
b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index ccfefe8a48..e4603b7607 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -74,57 +74,57 @@ public class ConsensusReqReaderTest {
 
   /**
    * Generate wal files as below: <br>
-   * _0-0-0.wal: 1,-1 <br>
-   * _1-1-0.wal: 2,2,2 <br>
-   * _2-2-0.wal: 3,3 <br>
-   * _3-3-0.wal: 3,4 <br>
-   * _4-4-0.wal: 4 <br>
-   * _5-4-0.wal: 4,4,5 <br>
-   * _6-5-0.wal: 6 <br>
+   * _0-0-1.wal: 1,-1 <br>
+   * _1-1-1.wal: 2,2,2 <br>
+   * _2-2-1.wal: 3,3 <br>
+   * _3-3-1.wal: 3,4 <br>
+   * _4-4-1.wal: 4 <br>
+   * _5-4-1.wal: 4,4,5 <br>
+   * _6-5-1.wal: 6 <br>
    * 1 - InsertRowNode, 2 - InsertRowsOfOneDeviceNode, 3 - InsertRowsNode, 4 -
    * InsertMultiTabletsNode, 5 - InsertTabletNode, 6 - InsertRowNode
    */
   private void simulateFileScenario01() throws IllegalPathException {
     InsertTabletNode insertTabletNode;
     InsertRowNode insertRowNode;
-    // _0-0-0.wal
+    // _0-0-1.wal
     insertRowNode = getInsertRowNode(devicePath);
     insertRowNode.setSearchIndex(1);
     walNode.log(0, insertRowNode); // 1
     insertTabletNode = getInsertTabletNode(devicePath, new long[] {2});
     walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // -1
     walNode.rollWALFile();
-    // _1-1-0.wal
+    // _1-1-1.wal
     insertRowNode = getInsertRowNode(devicePath);
     insertRowNode.setSearchIndex(2);
     walNode.log(0, insertRowNode); // 2
     walNode.log(0, insertRowNode); // 2
     walNode.log(0, insertRowNode); // 2
     walNode.rollWALFile();
-    // _2-2-0.wal
+    // _2-2-1.wal
     insertRowNode = getInsertRowNode(devicePath);
     insertRowNode.setSearchIndex(3);
     walNode.log(0, insertRowNode); // 3
     walNode.log(0, insertRowNode); // 3
     walNode.rollWALFile();
-    // _3-3-0.wal
+    // _3-3-1.wal
     insertRowNode.setDevicePath(new PartialPath(devicePath + "test"));
     walNode.log(0, insertRowNode); // 3
     insertTabletNode = getInsertTabletNode(devicePath, new long[] {4});
     insertTabletNode.setSearchIndex(4);
     walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
     walNode.rollWALFile();
-    // _4-4-0.wal
+    // _4-4-1.wal
     walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
     walNode.rollWALFile();
-    // _5-4-0.wal
+    // _5-4-1.wal
     walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
     walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
     insertTabletNode = getInsertTabletNode(devicePath, new long[] {5});
     insertTabletNode.setSearchIndex(5);
     walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 5
     walNode.rollWALFile();
-    // _6-5-0.wal
+    // _6-5-1.wal
     insertRowNode = getInsertRowNode(devicePath);
     insertRowNode.setSearchIndex(6);
     WALFlushListener walFlushListener = walNode.log(0, insertRowNode); // 6
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java 
b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
index 62244f3fc9..25187c3039 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
@@ -259,7 +259,7 @@ public class WALNodeTest {
     }
     walNode.onMemTableFlushed(memTable);
     walNode.onMemTableCreated(new PrimitiveMemTable(), tsFilePath);
-    // check existence of _0-0-0.wal file
+    // check existence of _0-0-0.wal file and _1-0-1.wal file
     assertTrue(
         new File(
                 logDirectory
@@ -270,7 +270,7 @@ public class WALNodeTest {
         new File(
                 logDirectory
                     + File.separator
-                    + WALFileUtils.getLogFileName(1, 0, 
WALFileStatus.CONTAINS_NONE_SEARCH_INDEX))
+                    + WALFileUtils.getLogFileName(1, 0, 
WALFileStatus.CONTAINS_SEARCH_INDEX))
             .exists());
     walNode.deleteOutdatedFiles();
     assertFalse(
@@ -283,7 +283,7 @@ public class WALNodeTest {
         new File(
                 logDirectory
                     + File.separator
-                    + WALFileUtils.getLogFileName(1, 0, 
WALFileStatus.CONTAINS_NONE_SEARCH_INDEX))
+                    + WALFileUtils.getLogFileName(1, 0, 
WALFileStatus.CONTAINS_SEARCH_INDEX))
             .exists());
     // check flush listeners
     try {

Reply via email to