This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_optimize_reader
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 796f2b913a187cbd1b9f44fd77cfd1cde05d36b2
Author: Alan Choo <[email protected]>
AuthorDate: Thu Sep 8 17:37:46 2022 +0800

    tolerate search index gap (#7272)
---
 .../apache/iotdb/db/wal/io/WALByteBufReader.java   |   2 +-
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |  82 ++++----
 .../iotdb/db/wal/node/ConsensusReqReaderTest.java  | 208 ++++++++++++++++++++-
 3 files changed, 242 insertions(+), 50 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java 
b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
index a993fe7c57..d5c0243795 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
@@ -44,7 +44,7 @@ public class WALByteBufReader implements Closeable {
   public WALByteBufReader(File logFile) throws IOException {
     this.logFile = logFile;
     this.channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ);
-    if (!readTailMagic().equals(MAGIC_STRING)) {
+    if (channel.size() < MAGIC_STRING_BYTES || 
!readTailMagic().equals(MAGIC_STRING)) {
       throw new IOException(String.format("Broken wal file %s", logFile));
     }
     // load metadata size
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java 
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 174ede96f2..cd71ca6db3 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -539,7 +539,7 @@ public class WALNode implements IWALNode {
         }
       }
 
-      // find all insert node of current wal file
+      // find all nodes of current wal file
       List<IConsensusRequest> tmpNodes = new ArrayList<>();
       long targetIndex = nextSearchIndex;
       try (WALByteBufReader walByteBufReader =
@@ -548,22 +548,25 @@ public class WALNode implements IWALNode {
           ByteBuffer buffer = walByteBufReader.next();
           WALEntryType type = WALEntryType.valueOf(buffer.get());
           if (type.needSearch()) {
-            // see WALInfoEntry#serialize, entry type + memtable id + insert 
node type
+            // see WALInfoEntry#serialize, entry type + memtable id + plan 
node type
             buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
-            long searchIndex = buffer.getLong();
+            long currentIndex = buffer.getLong();
             buffer.clear();
-            if (searchIndex == targetIndex) {
+            if (currentIndex == targetIndex) {
               tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
-            } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
-              insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-              targetIndex++;
-              tmpNodes = new ArrayList<>();
-              // remember to add current insert node
-              if (searchIndex == targetIndex) {
+            } else { // different search index, all slices found
+              if (!tmpNodes.isEmpty()) {
+                insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
+                tmpNodes = new ArrayList<>();
+              }
+              // remember to add current plan node
+              if (currentIndex > targetIndex) {
                 tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
+                targetIndex = currentIndex;
               }
             }
-          } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+          } else if (!tmpNodes
+              .isEmpty()) { // next entry doesn't need to be searched, all 
slices found
             insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
             targetIndex++;
             tmpNodes = new ArrayList<>();
@@ -575,47 +578,57 @@ public class WALNode implements IWALNode {
             identifier,
             nextSearchIndex);
         reset();
-        hasNext();
+        return hasNext();
       } catch (Exception e) {
-        logger.error("Fail to read wal from wal file {}", 
filesToSearch[currentFileIndex], e);
+        logger.error(
+            "Fail to read wal from wal file {}, skip this file.",
+            filesToSearch[currentFileIndex],
+            e);
+        // skip this file when it's broken from the beginning
+        if (insertNodes.isEmpty() && tmpNodes.isEmpty()) {
+          currentFileIndex++;
+          return hasNext();
+        }
       }
 
-      // find remaining slices of last insert plan of targetIndex
-      if (tmpNodes.isEmpty()) { // all insert plans scanned
+      // find remaining slices of last plan node of targetIndex
+      if (tmpNodes.isEmpty()) { // all plan nodes scanned
         currentFileIndex++;
       } else {
         int fileIndex = currentFileIndex + 1;
         while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) {
-          // cannot find any in this file, find all slices of last insert plan
+          // cannot find any in this file, so all slices of last plan node are 
found
           if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
               == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
             insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
             tmpNodes = Collections.emptyList();
             break;
           }
-          // read until find all insert node whose search index equals target 
index
+          // read until find all plan nodes whose search index equals target 
index
           try (WALByteBufReader walByteBufReader = new 
WALByteBufReader(filesToSearch[fileIndex])) {
+            // first search index are different, so all slices of last plan 
node are found
             if (walByteBufReader.getFirstSearchIndex() != targetIndex) {
               insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
               tmpNodes = Collections.emptyList();
               break;
             } else {
+              // read until one node has different search index
               while (walByteBufReader.hasNext()) {
                 ByteBuffer buffer = walByteBufReader.next();
                 WALEntryType type = WALEntryType.valueOf(buffer.get());
                 if (type.needSearch()) {
-                  // see WALInfoEntry#serialize, entry type + memtable id + 
insert node type
+                  // see WALInfoEntry#serialize, entry type + memtable id + 
plan node type
                   buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
-                  long searchIndex = buffer.getLong();
+                  long currentIndex = buffer.getLong();
                   buffer.clear();
-                  if (searchIndex == targetIndex) {
+                  if (currentIndex == targetIndex) {
                     tmpNodes.add(new MultiLeaderConsensusRequest(buffer));
-                  } else if (!tmpNodes.isEmpty()) { // find all slices of 
insert plan
+                  } else { // find all slices of plan node
                     insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
                     tmpNodes = Collections.emptyList();
                     break;
                   }
-                } else if (!tmpNodes.isEmpty()) { // find all slices of insert 
plan
+                } else { // find all slices of plan node
                   insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
                   tmpNodes = Collections.emptyList();
                   break;
@@ -628,9 +641,10 @@ public class WALNode implements IWALNode {
                 identifier,
                 nextSearchIndex);
             reset();
-            hasNext();
+            return hasNext();
           } catch (Exception e) {
-            logger.error("Fail to read wal from wal file {}", 
filesToSearch[currentFileIndex], e);
+            logger.error(
+                "Fail to read wal from wal file {}, skip this file.", 
filesToSearch[fileIndex], e);
           }
           if (!tmpNodes.isEmpty()) {
             fileIndex++;
@@ -664,25 +678,7 @@ public class WALNode implements IWALNode {
       }
 
       IndexedConsensusRequest request = itr.next();
-      if (request.getSearchIndex() == nextSearchIndex) {
-        nextSearchIndex++;
-      } else if (request.getSearchIndex() > nextSearchIndex) {
-        logger.warn(
-            "Search index of wal node-{} are not continuously, skip from {} to 
{}.",
-            identifier,
-            nextSearchIndex,
-            request.getSearchIndex());
-        skipTo(request.getSearchIndex() + 1);
-      } else {
-        logger.error(
-            "Search index of wal node-{} are out of order, {} is before {}.",
-            identifier,
-            nextSearchIndex,
-            request.getSearchIndex());
-        throw new RuntimeException(
-            String.format("Search index of wal node-%s are out of order", 
identifier));
-      }
-
+      nextSearchIndex = request.getSearchIndex() + 1;
       return request;
     }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java 
b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index 9c59f58a26..c19d64ed67 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -44,6 +44,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -75,9 +77,16 @@ public class ConsensusReqReaderTest {
   }
 
   /**
-   * Generate wal files as below: _0-0-1.wal: 1,-1 _1-1-1.wal: 2,2,2 
_2-2-1.wal: 3,3 _3-3-1.wal: 3,4
-   * _4-4-1.wal: 4 _5-4-1.wal: 4,4,5 _6-5-1.wal: 6 1 - InsertRowNode, 2 - 
InsertRowsOfOneDeviceNode,
-   * 3 - InsertRowsNode, 4 - InsertMultiTabletsNode, 5 - InsertTabletNode, 6 - 
InsertRowNode
+   * Generate wal files as below: <br>
+   * _0-0-1.wal: 1,-1 <br>
+   * _1-1-1.wal: 2,2,2 <br>
+   * _2-2-1.wal: 3,3 <br>
+   * _3-3-1.wal: 3,4 <br>
+   * _4-4-1.wal: 4 <br>
+   * _5-4-1.wal: 4,4,5 <br>
+   * _6-5-1.wal: 6 <br>
+   * 1 - InsertRowNode, 2 - InsertRowsOfOneDeviceNode, 3 - InsertRowsNode, 4 -
+   * InsertMultiTabletsNode, 5 - InsertTabletNode, 6 - InsertRowNode
    */
   private void simulateFileScenario01() throws IllegalPathException {
     InsertTabletNode insertTabletNode;
@@ -382,8 +391,12 @@ public class ConsensusReqReaderTest {
   }
 
   /**
-   * Generate wal files as below: _0-0-0.wal: -1,-1 _1-0-0.wal: -1 _2-0-1.wal: 
-1,1 _3-1-0.wal: -1 1
-   * - DeleteDataNode
+   * Generate wal files as below: <br>
+   * _0-0-0.wal: -1,-1 <br>
+   * _1-0-0.wal: -1 <br>
+   * _2-0-1.wal: -1,1 <br>
+   * _3-1-0.wal: -1 <br>
+   * 1 - DeleteDataNode
    */
   private void simulateFileScenario02() throws IllegalPathException {
     InsertRowNode insertRowNode = getInsertRowNode(devicePath);
@@ -402,15 +415,113 @@ public class ConsensusReqReaderTest {
     walNode.rollWALFile();
     // _3-1-0.wal
     walNode.log(0, insertRowNode); // -1
-    walNode.rollWALFile();
   }
 
   @Test
   public void scenario02TestGetReqIterator01() throws Exception {
     simulateFileScenario02();
+    walNode.rollWALFile();
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+    IndexedConsensusRequest request;
+    PlanNode planNode;
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = 
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof DeleteDataNode);
+      Assert.assertEquals(1, ((DeleteDataNode) planNode).getSearchIndex());
+    }
+  }
+
+  /**
+   * Generate wal files as below: <br>
+   * _0-0-1.wal: broken <br>
+   * _1-0-1.wal: 1,-1 <br>
+   * _2-1-1.wal: 2,2,2 <br>
+   * _3-2-1.wal: 3,3 <br>
+   * _4-3-1.wal: broken <br>
+   * _5-3-1.wal: 3,5 <br>
+   * _6-5-1.wal: broken <br>
+   * _7-5-1.wal: broken <br>
+   * _8-5-1.wal: broken <br>
+   * _9-8-1.wal: 8 <br>
+   * 1,2,3,5,8 - DeleteDataNode
+   */
+  private void simulateFileScenario03() throws IllegalPathException, 
IOException {
+    // _0-0-1.wal
+    walNode.rollWALFile();
+    new File(logDirectory, "_0-0-0.wal").delete();
+    new File(logDirectory, "_0-0-1.wal").createNewFile();
+    // _1-0-1.wal
+    DeleteDataNode deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(1);
+    walNode.log(0, deleteDataNode); // 1
+    deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(-1);
+    walNode.log(0, deleteDataNode); // -1
+    walNode.rollWALFile();
+    // _2-1-1.wal
+    deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(2);
+    walNode.log(0, deleteDataNode); // 2
+    walNode.log(0, deleteDataNode); // 2
+    walNode.log(0, deleteDataNode); // 2
+    walNode.rollWALFile();
+    // _3-2-1.wal
+    deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(3);
+    walNode.log(0, deleteDataNode); // 3
+    walNode.log(0, deleteDataNode); // 3
+    walNode.rollWALFile();
+    // _4-3-1.wal
+    walNode.rollWALFile();
+    new File(logDirectory, "_4-3-0.wal").delete();
+    new File(logDirectory, "_4-3-1.wal").createNewFile();
+    // _5-3-1.wal
+    deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(3);
+    walNode.log(0, deleteDataNode); // 3
+    deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(5);
+    walNode.log(0, deleteDataNode); // 5
+    walNode.rollWALFile();
+    // _6-5-1.wal
+    deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(6);
+    walNode.log(0, deleteDataNode); // 6
+    deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(7);
+    walNode.log(0, deleteDataNode); // 7
+    deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(8);
+    walNode.log(0, deleteDataNode); // 8
+    walNode.rollWALFile();
+    new File(logDirectory, "_6-5-1.wal").delete();
+    new File(logDirectory, "_6-5-1.wal").createNewFile();
+    // _7-5-1.wal
+    walNode.rollWALFile();
+    new File(logDirectory, "_7-5-0.wal").delete();
+    new File(logDirectory, "_7-5-1.wal").createNewFile();
+    // _8-5-1.wal
+    walNode.rollWALFile();
+    new File(logDirectory, "_8-5-0.wal").delete();
+    new File(logDirectory, "_8-5-1.wal").createNewFile();
+    // _9-8-1.wal
+    deleteDataNode = getDeleteDataNode(devicePath);
+    deleteDataNode.setSearchIndex(8);
+    walNode.log(0, deleteDataNode); // 8
+  }
+
+  @Test
+  public void scenario03TestGetReqIterator01() throws Exception {
+    simulateFileScenario03();
+    walNode.rollWALFile();
+
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
     IndexedConsensusRequest request;
     PlanNode planNode;
+
     Assert.assertTrue(iterator.hasNext());
     request = iterator.next();
     Assert.assertEquals(1, request.getRequests().size());
@@ -419,6 +530,91 @@ public class ConsensusReqReaderTest {
       Assert.assertTrue(planNode instanceof DeleteDataNode);
       Assert.assertEquals(1, ((DeleteDataNode) planNode).getSearchIndex());
     }
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertEquals(3, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = 
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof DeleteDataNode);
+      Assert.assertEquals(2, ((DeleteDataNode) planNode).getSearchIndex());
+    }
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertEquals(3, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = 
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof DeleteDataNode);
+      Assert.assertEquals(3, ((DeleteDataNode) planNode).getSearchIndex());
+    }
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = 
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof DeleteDataNode);
+      Assert.assertEquals(5, ((DeleteDataNode) planNode).getSearchIndex());
+    }
+    Assert.assertFalse(iterator.hasNext());
+    walNode.rollWALFile();
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = 
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof DeleteDataNode);
+      Assert.assertEquals(8, ((DeleteDataNode) planNode).getSearchIndex());
+    }
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void scenario03TestGetReqIterator02() throws Exception {
+    simulateFileScenario03();
+    walNode.rollWALFile();
+
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(6);
+    IndexedConsensusRequest request;
+    PlanNode planNode;
+
+    Assert.assertFalse(iterator.hasNext());
+    walNode.rollWALFile();
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = 
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof DeleteDataNode);
+      Assert.assertEquals(8, ((DeleteDataNode) planNode).getSearchIndex());
+    }
+    Assert.assertFalse(iterator.hasNext());
+
+    iterator.skipTo(3);
+
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertEquals(3, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = 
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof DeleteDataNode);
+      Assert.assertEquals(3, ((DeleteDataNode) planNode).getSearchIndex());
+    }
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = 
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof DeleteDataNode);
+      Assert.assertEquals(5, ((DeleteDataNode) planNode).getSearchIndex());
+    }
+    Assert.assertTrue(iterator.hasNext());
+    request = iterator.next();
+    Assert.assertEquals(1, request.getRequests().size());
+    for (IConsensusRequest innerRequest : request.getRequests()) {
+      planNode = 
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
+      Assert.assertTrue(planNode instanceof DeleteDataNode);
+      Assert.assertEquals(8, ((DeleteDataNode) planNode).getSearchIndex());
+    }
+    Assert.assertFalse(iterator.hasNext());
   }
 
   public static InsertRowNode getInsertRowNode(String devicePath) throws 
IllegalPathException {

Reply via email to