This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d5e409d8d6f Optimize WALNode.PlanNodeIterator.hasNext (#13013)
d5e409d8d6f is described below
commit d5e409d8d6fe2deb696d9e9e0af930c0773f4b30
Author: Li Yu Heng <[email protected]>
AuthorDate: Thu Jul 25 10:34:37 2024 +0800
Optimize WALNode.PlanNodeIterator.hasNext (#13013)
* is it work?
* no it's not
* all wal UT pass, maybe it's work
* feel's like leetcode medium
---
.../storageengine/dataregion/wal/node/WALNode.java | 179 ++++++++-------------
1 file changed, 67 insertions(+), 112 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 33245ebf9e5..607a99a5673 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -79,6 +79,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
@@ -703,126 +705,79 @@ public class WALNode implements IWALNode {
}
}
- // find all nodes of current wal file
- List<IConsensusRequest> tmpNodes = new ArrayList<>();
- long targetIndex = nextSearchIndex;
- try (WALByteBufReader walByteBufReader =
- new WALByteBufReader(filesToSearch[currentFileIndex])) {
- while (walByteBufReader.hasNext()) {
- ByteBuffer buffer = walByteBufReader.next();
- WALEntryType type = WALEntryType.valueOf(buffer.get());
- if (type.needSearch()) {
- // see WALInfoEntry#serialize, entry type + memtable id + plan
node type
- buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE +
PlanNodeType.BYTES);
- long currentIndex = buffer.getLong();
- buffer.clear();
- if (currentIndex == targetIndex) {
- tmpNodes.add(new IoTConsensusRequest(buffer));
- } else {
- // different search index, all slices found
- if (!tmpNodes.isEmpty()) {
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = new ArrayList<>();
- }
- // remember to add current plan node
- if (currentIndex > targetIndex) {
- tmpNodes.add(new IoTConsensusRequest(buffer));
- targetIndex = currentIndex;
+ /* ------ find all nodes from all wal file ------ */
+
+ AtomicReference<List<IConsensusRequest>> tmpNodes = new
AtomicReference<>(new ArrayList<>());
+ AtomicBoolean notFirstFile = new AtomicBoolean(false);
+ AtomicBoolean hasCollectedSufficientData = new AtomicBoolean(false);
+
+ // try to collect current tmpNodes to insertNodes, return true if
successfully collect an
+ // insert node
+ Runnable tryToCollectInsertNodeAndBumpIndex =
+ () -> {
+ if (!tmpNodes.get().isEmpty()) {
+ insertNodes.add(new IndexedConsensusRequest(nextSearchIndex,
tmpNodes.get()));
+ tmpNodes.set(new ArrayList<>());
+ nextSearchIndex++;
+ if (notFirstFile.get()) {
+ hasCollectedSufficientData.set(true);
}
}
- } else if (!tmpNodes.isEmpty()) {
- // next entry doesn't need to be searched, all slices found
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- targetIndex++;
- tmpNodes = new ArrayList<>();
- }
- }
- } catch (FileNotFoundException e) {
- logger.debug(
- "WAL file {} has been deleted, try to find next {} again.",
- identifier,
- nextSearchIndex);
- reset();
- return hasNext();
- } catch (Exception e) {
- brokenFileId =
WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
- logger.error(
- "Fail to read wal from wal file {}, skip this file.",
- filesToSearch[currentFileIndex],
- e);
- // skip this file when it's broken from the beginning
- if (insertNodes.isEmpty() && tmpNodes.isEmpty()) {
- currentFileIndex++;
- return hasNext();
+ };
+
+ COLLECT_FILE_LOOP:
+ for (; currentFileIndex < filesToSearch.length - 1; currentFileIndex++) {
+ // cannot find any in this file, so all slices of last plan node are
found
+ if
(WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
+ == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+ tryToCollectInsertNodeAndBumpIndex.run();
+ continue;
}
- }
-
- // find remaining slices of last plan node of targetIndex
- if (tmpNodes.isEmpty()) { // all plan nodes scanned
- currentFileIndex++;
- } else {
- int fileIndex = currentFileIndex + 1;
- while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) {
- // cannot find any in this file, so all slices of last plan node are
found
- if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
- == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
- }
- // read until find all plan nodes whose search index equals target
index
- try (WALByteBufReader walByteBufReader = new
WALByteBufReader(filesToSearch[fileIndex])) {
- // first search index are different, so all slices of last plan
node are found
- if (walByteBufReader.getFirstSearchIndex() != targetIndex) {
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
- } else {
- // read until one node has different search index
- while (walByteBufReader.hasNext()) {
- ByteBuffer buffer = walByteBufReader.next();
- WALEntryType type = WALEntryType.valueOf(buffer.get());
- if (type.needSearch()) {
- // see WALInfoEntry#serialize, entry type + memtable id +
plan node type
- buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE +
PlanNodeType.BYTES);
- long currentIndex = buffer.getLong();
- buffer.clear();
- if (currentIndex == targetIndex) {
- tmpNodes.add(new IoTConsensusRequest(buffer));
- } else { // find all slices of plan node
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
- }
- } else { // find all slices of plan node
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
+ try (WALByteBufReader walByteBufReader =
+ new WALByteBufReader(filesToSearch[currentFileIndex])) {
+ while (walByteBufReader.hasNext()) {
+ ByteBuffer buffer = walByteBufReader.next();
+ WALEntryType type = WALEntryType.valueOf(buffer.get());
+ if (type.needSearch()) {
+ // see WALInfoEntry#serialize, entry type + memtable id + plan
node type
+ buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE +
PlanNodeType.BYTES);
+ final long currentWalEntryIndex = buffer.getLong();
+ buffer.clear();
+ if (currentWalEntryIndex == -1) {
+ // WAL entry of targetIndex has been fully collected, so put
them into insertNodes
+ tryToCollectInsertNodeAndBumpIndex.run();
+ } else if (currentWalEntryIndex < nextSearchIndex) {
+ // WAL entry is outdated, do nothing, continue to see next WAL
entry
+ } else if (currentWalEntryIndex == nextSearchIndex) {
+ tmpNodes.get().add(new IoTConsensusRequest(buffer));
+ } else {
+ // currentWalEntryIndex > targetIndex
+ // WAL entry of targetIndex has been fully collected, put them
into insertNodes
+ tryToCollectInsertNodeAndBumpIndex.run();
+ if (currentWalEntryIndex != nextSearchIndex) {
+ logger.warn(
+ "The search index of next WAL entry should be {}, but
actually it's {}",
+ nextSearchIndex,
+ currentWalEntryIndex);
+ nextSearchIndex = currentWalEntryIndex;
}
+ tmpNodes.get().add(new IoTConsensusRequest(buffer));
}
+ } else {
+ tryToCollectInsertNodeAndBumpIndex.run();
+ }
+ if (hasCollectedSufficientData.get()) {
+ break COLLECT_FILE_LOOP;
}
- } catch (FileNotFoundException e) {
- logger.debug(
- "WAL file {} has been deleted, try to find next {} again.",
- identifier,
- nextSearchIndex);
- reset();
- return hasNext();
- } catch (Exception e) {
- brokenFileId =
WALFileUtils.parseVersionId(filesToSearch[fileIndex].getName());
- logger.error(
- "Fail to read wal from wal file {}, skip this file.",
filesToSearch[fileIndex], e);
- }
- if (!tmpNodes.isEmpty()) {
- fileIndex++;
}
+ } catch (Exception e) {
+ brokenFileId =
WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
+ logger.error(
+ "Fail to read wal from wal file {}, skip this file.",
+ filesToSearch[currentFileIndex],
+ e);
}
-
- if (tmpNodes.isEmpty()) { // all insert plans scanned
- currentFileIndex = fileIndex;
- } else {
- needUpdatingFilesToSearch = true;
- }
+ notFirstFile.set(true);
}
// update file index and version id