This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6d009c29705b84427b53e6518151035ee2b8a371 Author: Li Yu Heng <[email protected]> AuthorDate: Thu Jul 25 10:34:37 2024 +0800 Optimize WALNode.PlanNodeIterator.hasNext (#13013) * is it work? * no it's not * all wal UT pass, maybe it's work * feel's like leetcode medium (cherry picked from commit d5e409d8d6fe2deb696d9e9e0af930c0773f4b30) --- .../storageengine/dataregion/wal/node/WALNode.java | 179 ++++++++------------- 1 file changed, 67 insertions(+), 112 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index 33245ebf9e5..607a99a5673 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -79,6 +79,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -703,126 +705,79 @@ public class WALNode implements IWALNode { } } - // find all nodes of current wal file - List<IConsensusRequest> tmpNodes = new ArrayList<>(); - long targetIndex = nextSearchIndex; - try (WALByteBufReader walByteBufReader = - new WALByteBufReader(filesToSearch[currentFileIndex])) { - while (walByteBufReader.hasNext()) { - ByteBuffer buffer = walByteBufReader.next(); - WALEntryType type = WALEntryType.valueOf(buffer.get()); - if (type.needSearch()) { - // see WALInfoEntry#serialize, entry type + memtable id + plan node type - buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES); - long currentIndex = buffer.getLong(); - buffer.clear(); - if (currentIndex == targetIndex) { - tmpNodes.add(new IoTConsensusRequest(buffer)); - } else { - // different search index, all slices found - if (!tmpNodes.isEmpty()) { - insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); - tmpNodes = new ArrayList<>(); - } - // remember to add current plan node - if (currentIndex > targetIndex) { - tmpNodes.add(new IoTConsensusRequest(buffer)); - targetIndex = currentIndex; + /* ------ find all nodes from all wal file ------ */ + + AtomicReference<List<IConsensusRequest>> tmpNodes = new AtomicReference<>(new ArrayList<>()); + AtomicBoolean notFirstFile = new AtomicBoolean(false); + AtomicBoolean hasCollectedSufficientData = new AtomicBoolean(false); + + // try to collect current tmpNodes to insertNodes, return true if successfully collect an + // insert node + Runnable tryToCollectInsertNodeAndBumpIndex = + () -> { + if (!tmpNodes.get().isEmpty()) { + insertNodes.add(new IndexedConsensusRequest(nextSearchIndex, tmpNodes.get())); + tmpNodes.set(new ArrayList<>()); + nextSearchIndex++; + if (notFirstFile.get()) { + hasCollectedSufficientData.set(true); } } - } else if (!tmpNodes.isEmpty()) { - // next entry doesn't need to be searched, all slices found - insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); - targetIndex++; - tmpNodes = new ArrayList<>(); - } - } - } catch (FileNotFoundException e) { - logger.debug( - "WAL file {} has been deleted, try to find next {} again.", - identifier, - nextSearchIndex); - reset(); - return hasNext(); - } catch (Exception e) { - brokenFileId = WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName()); - logger.error( - "Fail to read wal from wal file {}, skip this file.", - filesToSearch[currentFileIndex], - e); - // skip this file when it's broken from the beginning - if (insertNodes.isEmpty() && tmpNodes.isEmpty()) { - currentFileIndex++; - return hasNext(); + }; + + COLLECT_FILE_LOOP: + for (; currentFileIndex < filesToSearch.length - 1; currentFileIndex++) { + // cannot find any in this file, so all slices of last plan node are found + if (WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName()) + == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) { + tryToCollectInsertNodeAndBumpIndex.run(); + continue; } - } - - // find remaining slices of last plan node of targetIndex - if (tmpNodes.isEmpty()) { // all plan nodes scanned - currentFileIndex++; - } else { - int fileIndex = currentFileIndex + 1; - while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) { - // cannot find any in this file, so all slices of last plan node are found - if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName()) - == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) { - insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); - tmpNodes = Collections.emptyList(); - break; - } - // read until find all plan nodes whose search index equals target index - try (WALByteBufReader walByteBufReader = new WALByteBufReader(filesToSearch[fileIndex])) { - // first search index are different, so all slices of last plan node are found - if (walByteBufReader.getFirstSearchIndex() != targetIndex) { - insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); - tmpNodes = Collections.emptyList(); - break; - } else { - // read until one node has different search index - while (walByteBufReader.hasNext()) { - ByteBuffer buffer = walByteBufReader.next(); - WALEntryType type = WALEntryType.valueOf(buffer.get()); - if (type.needSearch()) { - // see WALInfoEntry#serialize, entry type + memtable id + plan node type - buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES); - long currentIndex = buffer.getLong(); - buffer.clear(); - if (currentIndex == targetIndex) { - tmpNodes.add(new IoTConsensusRequest(buffer)); - } else { // find all slices of plan node - insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); - tmpNodes = Collections.emptyList(); - break; - } - } else { // find all slices of plan node - insertNodes.add(new IndexedConsensusRequest(targetIndex, tmpNodes)); - tmpNodes = Collections.emptyList(); - break; + try (WALByteBufReader walByteBufReader = + new WALByteBufReader(filesToSearch[currentFileIndex])) { + while (walByteBufReader.hasNext()) { + ByteBuffer buffer = walByteBufReader.next(); + WALEntryType type = WALEntryType.valueOf(buffer.get()); + if (type.needSearch()) { + // see WALInfoEntry#serialize, entry type + memtable id + plan node type + buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES); + final long currentWalEntryIndex = buffer.getLong(); + buffer.clear(); + if (currentWalEntryIndex == -1) { + // WAL entry of targetIndex has been fully collected, so put them into insertNodes + tryToCollectInsertNodeAndBumpIndex.run(); + } else if (currentWalEntryIndex < nextSearchIndex) { + // WAL entry is outdated, do nothing, continue to see next WAL entry + } else if (currentWalEntryIndex == nextSearchIndex) { + tmpNodes.get().add(new IoTConsensusRequest(buffer)); + } else { + // currentWalEntryIndex > targetIndex + // WAL entry of targetIndex has been fully collected, put them into insertNodes + tryToCollectInsertNodeAndBumpIndex.run(); + if (currentWalEntryIndex != nextSearchIndex) { + logger.warn( + "The search index of next WAL entry should be {}, but actually it's {}", + nextSearchIndex, + currentWalEntryIndex); + nextSearchIndex = currentWalEntryIndex; } + tmpNodes.get().add(new IoTConsensusRequest(buffer)); } + } else { + tryToCollectInsertNodeAndBumpIndex.run(); + } + if (hasCollectedSufficientData.get()) { + break COLLECT_FILE_LOOP; } - } catch (FileNotFoundException e) { - logger.debug( - "WAL file {} has been deleted, try to find next {} again.", - identifier, - nextSearchIndex); - reset(); - return hasNext(); - } catch (Exception e) { - brokenFileId = WALFileUtils.parseVersionId(filesToSearch[fileIndex].getName()); - logger.error( - "Fail to read wal from wal file {}, skip this file.", filesToSearch[fileIndex], e); - } - if (!tmpNodes.isEmpty()) { - fileIndex++; } + } catch (Exception e) { + brokenFileId = WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName()); + logger.error( + "Fail to read wal from wal file {}, skip this file.", + filesToSearch[currentFileIndex], + e); } - - if (tmpNodes.isEmpty()) { // all insert plans scanned - currentFileIndex = fileIndex; - } else { - needUpdatingFilesToSearch = true; - } + notFirstFile.set(true); } // update file index and version id
