This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch multileader_restart_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/multileader_restart_test by
this push:
new 4c2a142b28 add more metrics
4c2a142b28 is described below
commit 4c2a142b28d2201d1aa386486fb5a2c901cf197f
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Fri Jul 8 16:54:15 2022 +0800
add more metrics
---
.../java/org/apache/iotdb/db/wal/io/WALReader.java | 59 +++---
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 219 ++++++++++-----------
2 files changed, 141 insertions(+), 137 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
b/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
index 197e231728..567b33ba9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALReader.java
@@ -68,38 +68,43 @@ public class WALReader implements Closeable {
/** Like {@link Iterator#hasNext()} */
public boolean hasNext() {
- if (itr != null && itr.hasNext()) {
- return true;
- }
- // read WALEntries from log stream
+ long hasNextStartTime = System.nanoTime();
try {
- if (fileCorrupted) {
- return false;
+ if (itr != null && itr.hasNext()) {
+ return true;
}
- walEntries.clear();
- while (walEntries.size() < BATCH_LIMIT) {
- long startTime = System.nanoTime();
- WALEntry walEntry = WALEntry.deserialize(logStream);
- StepTracker.trace("walEntryDeserialize", 1000, startTime,
System.nanoTime());
- walEntries.add(walEntry);
+ // read WALEntries from log stream
+ try {
+ if (fileCorrupted) {
+ return false;
+ }
+ walEntries.clear();
+ while (walEntries.size() < BATCH_LIMIT) {
+ long startTime = System.nanoTime();
+ WALEntry walEntry = WALEntry.deserialize(logStream);
+ StepTracker.trace("walEntryDeserialize", 1000, startTime,
System.nanoTime());
+ walEntries.add(walEntry);
+ }
+ } catch (EOFException e) {
+ // reach end of wal file
+ fileCorrupted = true;
+ } catch (IllegalPathException e) {
+ fileCorrupted = true;
+ logger.warn(
+ "WALEntry of wal file {} contains illegal path, skip illegal
WALEntries.", logFile, e);
+ } catch (Exception e) {
+ fileCorrupted = true;
+ logger.warn("Fail to read WALEntry from wal file {}, skip broken
WALEntries.", logFile, e);
}
- } catch (EOFException e) {
- // reach end of wal file
- fileCorrupted = true;
- } catch (IllegalPathException e) {
- fileCorrupted = true;
- logger.warn(
- "WALEntry of wal file {} contains illegal path, skip illegal
WALEntries.", logFile, e);
- } catch (Exception e) {
- fileCorrupted = true;
- logger.warn("Fail to read WALEntry from wal file {}, skip broken
WALEntries.", logFile, e);
- }
- if (walEntries.size() != 0) {
- itr = walEntries.iterator();
- return true;
+ if (walEntries.size() != 0) {
+ itr = walEntries.iterator();
+ return true;
+ }
+ return false;
+ } finally {
+ StepTracker.trace("WALReader.hasNext()", 400, hasNextStartTime,
System.nanoTime());
}
- return false;
}
/** Like {@link Iterator#next()} */
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index cecdb43fd9..920baff953 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.wal.node;
+import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -203,6 +204,7 @@ public class WALNode implements IWALNode {
}
// region Task to delete outdated .wal files
+
/** Delete outdated .wal files */
public void deleteOutdatedFiles() {
try {
@@ -680,146 +682,143 @@ public class WALNode implements IWALNode {
@Override
public boolean hasNext() {
- if (itr != null && itr.hasNext()) {
- return true;
- }
-
- // clear outdated iterator
- insertNodes.clear();
- itr = null;
-
- // update files to search
- if (needUpdatingFilesToSearch || filesToSearch == null) {
- updateFilesToSearch();
- if (needUpdatingFilesToSearch) {
- logger.warn("update file to search index = {} failed.",
nextSearchIndex);
- return false;
+ long hasNextStartTime = System.nanoTime();
+ try {
+ if (itr != null && itr.hasNext()) {
+ return true;
}
- }
- // find file contains search index
- while
(WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
- == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
- currentFileIndex++;
- if (currentFileIndex >= filesToSearch.length) {
- needUpdatingFilesToSearch = true;
- return false;
+ // clear outdated iterator
+ insertNodes.clear();
+ itr = null;
+
+ // update files to search
+ if (needUpdatingFilesToSearch || filesToSearch == null) {
+ updateFilesToSearch();
+ if (needUpdatingFilesToSearch) {
+ logger.warn("update file to search index = {} failed.",
nextSearchIndex);
+ return false;
+ }
}
- }
- // find file contains search index
- while
(WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
- == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
- currentFileIndex++;
- if (currentFileIndex >= filesToSearch.length) {
- needUpdatingFilesToSearch = true;
- return false;
+ // find file contains search index
+ while
(WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
+ == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+ currentFileIndex++;
+ if (currentFileIndex >= filesToSearch.length) {
+ needUpdatingFilesToSearch = true;
+ return false;
+ }
}
- }
- // find all insert plan of current wal file
- List<InsertNode> tmpNodes = new ArrayList<>();
- long targetIndex = nextSearchIndex;
- try (WALReader walReader = new
WALReader(filesToSearch[currentFileIndex])) {
- while (walReader.hasNext()) {
- WALEntry walEntry = walReader.next();
- if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
- || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
- InsertNode insertNode = (InsertNode) walEntry.getValue();
- if (insertNode.getSearchIndex() == targetIndex) {
- tmpNodes.add(insertNode);
+ // find all insert plan of current wal file
+ List<InsertNode> tmpNodes = new ArrayList<>();
+ long targetIndex = nextSearchIndex;
+ try (WALReader walReader = new
WALReader(filesToSearch[currentFileIndex])) {
+ while (walReader.hasNext()) {
+ WALEntry walEntry = walReader.next();
+ if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+ || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+ InsertNode insertNode = (InsertNode) walEntry.getValue();
+ if (insertNode.getSearchIndex() == targetIndex) {
+ tmpNodes.add(insertNode);
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert
plan
+ insertNodes.add(mergeInsertNodes(tmpNodes));
+ targetIndex++;
+ tmpNodes = new ArrayList<>();
+ // remember to add current insert node
+ if (insertNode.getSearchIndex() == targetIndex) {
+ tmpNodes.add(insertNode);
+ }
+ }
} else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
insertNodes.add(mergeInsertNodes(tmpNodes));
targetIndex++;
tmpNodes = new ArrayList<>();
- // remember to add current insert node
- if (insertNode.getSearchIndex() == targetIndex) {
- tmpNodes.add(insertNode);
- }
}
- } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
- insertNodes.add(mergeInsertNodes(tmpNodes));
- targetIndex++;
- tmpNodes = new ArrayList<>();
}
+ } catch (FileNotFoundException e) {
+ logger.debug(
+ "WAL file {} has been deleted, try to find next {} again.",
+ identifier,
+ nextSearchIndex);
+ reset();
+ hasNext();
+ } catch (Exception e) {
+ logger.error("Fail to read wal from wal file {}",
filesToSearch[currentFileIndex], e);
}
- } catch (FileNotFoundException e) {
- logger.debug(
- "WAL file {} has been deleted, try to find next {} again.",
- identifier,
- nextSearchIndex);
- reset();
- hasNext();
- } catch (Exception e) {
- logger.error("Fail to read wal from wal file {}",
filesToSearch[currentFileIndex], e);
- }
- // find remaining slices of last insert plan of targetIndex
- if (tmpNodes.isEmpty()) { // all insert plans scanned
- currentFileIndex++;
- } else {
- int fileIndex = currentFileIndex + 1;
- while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length) {
- // cannot find any in this file, find all slices of last insert plan
- if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
- == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
- insertNodes.add(mergeInsertNodes(tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
- }
+ // find remaining slices of last insert plan of targetIndex
+ if (tmpNodes.isEmpty()) { // all insert plans scanned
+ currentFileIndex++;
+ } else {
+ long nextFileStartTime = System.nanoTime();
+ int fileIndex = currentFileIndex + 1;
+ while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length) {
+ // cannot find any in this file, find all slices of last insert
plan
+ if
(WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
+ == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+ insertNodes.add(mergeInsertNodes(tmpNodes));
+ tmpNodes = Collections.emptyList();
+ break;
+ }
- try (WALReader walReader = new WALReader(filesToSearch[fileIndex])) {
- while (walReader.hasNext()) {
- WALEntry walEntry = walReader.next();
- if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
- || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
- InsertNode insertNode = (InsertNode) walEntry.getValue();
- if (insertNode.getSearchIndex() == targetIndex) {
- tmpNodes.add(insertNode);
+ try (WALReader walReader = new
WALReader(filesToSearch[fileIndex])) {
+ while (walReader.hasNext()) {
+ WALEntry walEntry = walReader.next();
+ if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+ || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+ InsertNode insertNode = (InsertNode) walEntry.getValue();
+ if (insertNode.getSearchIndex() == targetIndex) {
+ tmpNodes.add(insertNode);
+ } else if (!tmpNodes.isEmpty()) { // find all slices of
insert plan
+ insertNodes.add(mergeInsertNodes(tmpNodes));
+ tmpNodes = Collections.emptyList();
+ break;
+ }
} else if (!tmpNodes.isEmpty()) { // find all slices of insert
plan
insertNodes.add(mergeInsertNodes(tmpNodes));
tmpNodes = Collections.emptyList();
break;
}
- } else if (!tmpNodes.isEmpty()) { // find all slices of insert
plan
- insertNodes.add(mergeInsertNodes(tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
}
+ } catch (FileNotFoundException e) {
+ logger.debug(
+ "WAL file {} has been deleted, try to find next {} again.",
+ identifier,
+ nextSearchIndex);
+ reset();
+ hasNext();
+ } catch (Exception e) {
+ logger.error("Fail to read wal from wal file {}",
filesToSearch[currentFileIndex], e);
+ }
+ if (!tmpNodes.isEmpty()) {
+ fileIndex++;
}
- } catch (FileNotFoundException e) {
- logger.debug(
- "WAL file {} has been deleted, try to find next {} again.",
- identifier,
- nextSearchIndex);
- reset();
- hasNext();
- } catch (Exception e) {
- logger.error("Fail to read wal from wal file {}",
filesToSearch[currentFileIndex], e);
}
- if (!tmpNodes.isEmpty()) {
- fileIndex++;
+
+ if (tmpNodes.isEmpty()) { // all insert plans scanned
+ currentFileIndex = fileIndex;
+ } else {
+ needUpdatingFilesToSearch = true;
}
+ StepTracker.trace("nextFileStartTime", 10, nextFileStartTime,
System.nanoTime());
}
- if (tmpNodes.isEmpty()) { // all insert plans scanned
- currentFileIndex = fileIndex;
- } else {
+ // update file index and version id
+ if (currentFileIndex >= filesToSearch.length) {
needUpdatingFilesToSearch = true;
}
- }
-
- // update file index and version id
- if (currentFileIndex >= filesToSearch.length) {
- needUpdatingFilesToSearch = true;
- }
- // update iterator
- if (insertNodes.size() != 0) {
- itr = insertNodes.iterator();
- return true;
+ // update iterator
+ if (insertNodes.size() != 0) {
+ itr = insertNodes.iterator();
+ return true;
+ }
+ return false;
+ } finally {
+ StepTracker.trace("WALNode.hasNext()", 400, hasNextStartTime,
System.nanoTime());
}
- return false;
}
@Override