This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch ml_optimize_reader in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 05539bcdfab2cd082372d18e5744498a1d8fcc7e Author: ZhangHongYin <[email protected]> AuthorDate: Tue Sep 13 10:47:26 2022 +0800 [IOTDB-4372] adapt PlanNodeIterator. (#7275) --- .../multileader/logdispatcher/LogDispatcher.java | 26 ++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index e3539752be..abd491dc5d 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -361,8 +361,30 @@ public class LogDispatcher { logger.warn("wait for next WAL entry is interrupted"); } IndexedConsensusRequest data = walEntryiterator.next(); - currentIndex = data.getSearchIndex(); - iteratorIndex = currentIndex; + if (currentIndex > data.getSearchIndex()) { + // if the index of request is smaller than currentIndex, then continue + logger.warn( + "search for one Entry which index is {}, but find a smaller one, index : {}", + currentIndex, + data.getSearchIndex()); + continue; + } else if (currentIndex < data.getSearchIndex()) { + logger.warn( + "search for one Entry which index is {}, but find a larger one, index : {}", + currentIndex, + data.getSearchIndex()); + if (data.getSearchIndex() >= maxIndex) { + // if the index of request is larger than maxIndex, then finish + walEntryiterator.skipTo(currentIndex); + break; + } + // if the index of request is larger than currentIndex, and smaller than maxIndex, then + // skip to index + currentIndex = data.getSearchIndex(); + walEntryiterator.skipTo(currentIndex); + iteratorIndex = currentIndex; + } + // construct request from wal for (IConsensusRequest innerRequest : data.getRequests()) { logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), currentIndex, true)); }
