This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6d009c29705b84427b53e6518151035ee2b8a371
Author: Li Yu Heng <[email protected]>
AuthorDate: Thu Jul 25 10:34:37 2024 +0800

    Optimize WALNode.PlanNodeIterator.hasNext (#13013)
    
    * is it work?
    
    * no it's not
    
    * all wal UT pass, maybe it's work
    
    * feel's like leetcode medium
    
    (cherry picked from commit d5e409d8d6fe2deb696d9e9e0af930c0773f4b30)
---
 .../storageengine/dataregion/wal/node/WALNode.java | 179 ++++++++-------------
 1 file changed, 67 insertions(+), 112 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 33245ebf9e5..607a99a5673 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -79,6 +79,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -703,126 +705,79 @@ public class WALNode implements IWALNode {
         }
       }
 
-      // find all nodes of current wal file
-      List<IConsensusRequest> tmpNodes = new ArrayList<>();
-      long targetIndex = nextSearchIndex;
-      try (WALByteBufReader walByteBufReader =
-          new WALByteBufReader(filesToSearch[currentFileIndex])) {
-        while (walByteBufReader.hasNext()) {
-          ByteBuffer buffer = walByteBufReader.next();
-          WALEntryType type = WALEntryType.valueOf(buffer.get());
-          if (type.needSearch()) {
-            // see WALInfoEntry#serialize, entry type + memtable id + plan 
node type
-            buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
-            long currentIndex = buffer.getLong();
-            buffer.clear();
-            if (currentIndex == targetIndex) {
-              tmpNodes.add(new IoTConsensusRequest(buffer));
-            } else {
-              // different search index, all slices found
-              if (!tmpNodes.isEmpty()) {
-                insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-                tmpNodes = new ArrayList<>();
-              }
-              // remember to add current plan node
-              if (currentIndex > targetIndex) {
-                tmpNodes.add(new IoTConsensusRequest(buffer));
-                targetIndex = currentIndex;
+      /* ------ find all nodes from all wal file ------ */
+
+      AtomicReference<List<IConsensusRequest>> tmpNodes = new 
AtomicReference<>(new ArrayList<>());
+      AtomicBoolean notFirstFile = new AtomicBoolean(false);
+      AtomicBoolean hasCollectedSufficientData = new AtomicBoolean(false);
+
+      // try to collect current tmpNodes to insertNodes, return true if 
successfully collect an
+      // insert node
+      Runnable tryToCollectInsertNodeAndBumpIndex =
+          () -> {
+            if (!tmpNodes.get().isEmpty()) {
+              insertNodes.add(new IndexedConsensusRequest(nextSearchIndex, 
tmpNodes.get()));
+              tmpNodes.set(new ArrayList<>());
+              nextSearchIndex++;
+              if (notFirstFile.get()) {
+                hasCollectedSufficientData.set(true);
               }
             }
-          } else if (!tmpNodes.isEmpty()) {
-            // next entry doesn't need to be searched, all slices found
-            insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-            targetIndex++;
-            tmpNodes = new ArrayList<>();
-          }
-        }
-      } catch (FileNotFoundException e) {
-        logger.debug(
-            "WAL file {} has been deleted, try to find next {} again.",
-            identifier,
-            nextSearchIndex);
-        reset();
-        return hasNext();
-      } catch (Exception e) {
-        brokenFileId = 
WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
-        logger.error(
-            "Fail to read wal from wal file {}, skip this file.",
-            filesToSearch[currentFileIndex],
-            e);
-        // skip this file when it's broken from the beginning
-        if (insertNodes.isEmpty() && tmpNodes.isEmpty()) {
-          currentFileIndex++;
-          return hasNext();
+          };
+
+      COLLECT_FILE_LOOP:
+      for (; currentFileIndex < filesToSearch.length - 1; currentFileIndex++) {
+        // cannot find any in this file, so all slices of last plan node are 
found
+        if 
(WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
+            == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+          tryToCollectInsertNodeAndBumpIndex.run();
+          continue;
         }
-      }
-
-      // find remaining slices of last plan node of targetIndex
-      if (tmpNodes.isEmpty()) { // all plan nodes scanned
-        currentFileIndex++;
-      } else {
-        int fileIndex = currentFileIndex + 1;
-        while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) {
-          // cannot find any in this file, so all slices of last plan node are 
found
-          if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
-              == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
-            insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-            tmpNodes = Collections.emptyList();
-            break;
-          }
-          // read until find all plan nodes whose search index equals target 
index
-          try (WALByteBufReader walByteBufReader = new 
WALByteBufReader(filesToSearch[fileIndex])) {
-            // first search index are different, so all slices of last plan 
node are found
-            if (walByteBufReader.getFirstSearchIndex() != targetIndex) {
-              insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-              tmpNodes = Collections.emptyList();
-              break;
-            } else {
-              // read until one node has different search index
-              while (walByteBufReader.hasNext()) {
-                ByteBuffer buffer = walByteBufReader.next();
-                WALEntryType type = WALEntryType.valueOf(buffer.get());
-                if (type.needSearch()) {
-                  // see WALInfoEntry#serialize, entry type + memtable id + 
plan node type
-                  buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
-                  long currentIndex = buffer.getLong();
-                  buffer.clear();
-                  if (currentIndex == targetIndex) {
-                    tmpNodes.add(new IoTConsensusRequest(buffer));
-                  } else { // find all slices of plan node
-                    insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-                    tmpNodes = Collections.emptyList();
-                    break;
-                  }
-                } else { // find all slices of plan node
-                  insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-                  tmpNodes = Collections.emptyList();
-                  break;
+        try (WALByteBufReader walByteBufReader =
+            new WALByteBufReader(filesToSearch[currentFileIndex])) {
+          while (walByteBufReader.hasNext()) {
+            ByteBuffer buffer = walByteBufReader.next();
+            WALEntryType type = WALEntryType.valueOf(buffer.get());
+            if (type.needSearch()) {
+              // see WALInfoEntry#serialize, entry type + memtable id + plan 
node type
+              buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
+              final long currentWalEntryIndex = buffer.getLong();
+              buffer.clear();
+              if (currentWalEntryIndex == -1) {
+                // WAL entry of targetIndex has been fully collected, so put 
them into insertNodes
+                tryToCollectInsertNodeAndBumpIndex.run();
+              } else if (currentWalEntryIndex < nextSearchIndex) {
+                // WAL entry is outdated, do nothing, continue to see next WAL 
entry
+              } else if (currentWalEntryIndex == nextSearchIndex) {
+                tmpNodes.get().add(new IoTConsensusRequest(buffer));
+              } else {
+                // currentWalEntryIndex > targetIndex
+                // WAL entry of targetIndex has been fully collected, put them 
into insertNodes
+                tryToCollectInsertNodeAndBumpIndex.run();
+                if (currentWalEntryIndex != nextSearchIndex) {
+                  logger.warn(
+                      "The search index of next WAL entry should be {}, but 
actually it's {}",
+                      nextSearchIndex,
+                      currentWalEntryIndex);
+                  nextSearchIndex = currentWalEntryIndex;
                 }
+                tmpNodes.get().add(new IoTConsensusRequest(buffer));
               }
+            } else {
+              tryToCollectInsertNodeAndBumpIndex.run();
+            }
+            if (hasCollectedSufficientData.get()) {
+              break COLLECT_FILE_LOOP;
             }
-          } catch (FileNotFoundException e) {
-            logger.debug(
-                "WAL file {} has been deleted, try to find next {} again.",
-                identifier,
-                nextSearchIndex);
-            reset();
-            return hasNext();
-          } catch (Exception e) {
-            brokenFileId = 
WALFileUtils.parseVersionId(filesToSearch[fileIndex].getName());
-            logger.error(
-                "Fail to read wal from wal file {}, skip this file.", 
filesToSearch[fileIndex], e);
-          }
-          if (!tmpNodes.isEmpty()) {
-            fileIndex++;
           }
+        } catch (Exception e) {
+          brokenFileId = 
WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
+          logger.error(
+              "Fail to read wal from wal file {}, skip this file.",
+              filesToSearch[currentFileIndex],
+              e);
         }
-
-        if (tmpNodes.isEmpty()) { // all insert plans scanned
-          currentFileIndex = fileIndex;
-        } else {
-          needUpdatingFilesToSearch = true;
-        }
+        notFirstFile.set(true);
       }
 
       // update file index and version id

Reply via email to