This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_optimize_reader
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_optimize_reader by this 
push:
     new 52cce30a05 fix some issues when constructing batch
52cce30a05 is described below

commit 52cce30a058c00bdd119c773eecb2b094a13a0ea
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Sep 13 18:01:50 2022 +0800

    fix some issues when constructing batch
---
 .../multileader/logdispatcher/LogDispatcher.java   | 34 +++++++++-------------
 1 file changed, 13 insertions(+), 21 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index abd491dc5d..2973e4e3ce 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -346,14 +346,13 @@ public class LogDispatcher {
               currentIndex,
               maxIndex,
               iteratorIndex));
+      long targetIndex = currentIndex;
       // Even if there is no WAL files, these code won't produce error.
-      if (iteratorIndex != currentIndex) {
-        walEntryiterator.skipTo(currentIndex);
-        iteratorIndex = currentIndex;
-      }
-      while (currentIndex < maxIndex
+      walEntryiterator.skipTo(targetIndex);
+
+      while (targetIndex < maxIndex
           && logBatches.size() < 
config.getReplication().getMaxRequestPerBatch()) {
-        logger.debug("construct from WAL for one Entry, index : {}", 
currentIndex);
+        logger.debug("construct from WAL for one Entry, index : {}", 
targetIndex);
         try {
           walEntryiterator.waitForNextReady();
         } catch (InterruptedException e) {
@@ -361,38 +360,31 @@ public class LogDispatcher {
           logger.warn("wait for next WAL entry is interrupted");
         }
         IndexedConsensusRequest data = walEntryiterator.next();
-        if (currentIndex > data.getSearchIndex()) {
+        if (targetIndex > data.getSearchIndex()) {
           // if the index of request is smaller than currentIndex, then 
continue
           logger.warn(
               "search for one Entry which index is {}, but find a smaller one, 
index : {}",
-              currentIndex,
+              targetIndex,
               data.getSearchIndex());
           continue;
-        } else if (currentIndex < data.getSearchIndex()) {
+        } else if (targetIndex < data.getSearchIndex()) {
           logger.warn(
               "search for one Entry which index is {}, but find a larger one, 
index : {}",
-              currentIndex,
+              targetIndex,
               data.getSearchIndex());
           if (data.getSearchIndex() >= maxIndex) {
             // if the index of request is larger than maxIndex, then finish
-            walEntryiterator.skipTo(currentIndex);
             break;
           }
-          // if the index of request is larger than currentIndex, and smaller 
than maxIndex, then
-          // skip to index
-          currentIndex = data.getSearchIndex();
-          walEntryiterator.skipTo(currentIndex);
-          iteratorIndex = currentIndex;
         }
+        targetIndex = data.getSearchIndex() + 1;
         // construct request from wal
         for (IConsensusRequest innerRequest : data.getRequests()) {
-          logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), 
currentIndex, true));
-        }
-        if (currentIndex == maxIndex - 1) {
-          break;
+          logBatches.add(
+              new TLogBatch(innerRequest.serializeToByteBuffer(), 
data.getSearchIndex(), true));
         }
       }
-      return currentIndex;
+      return logBatches.size() > 0 ? logBatches.get(0).searchIndex : 
currentIndex;
     }
 
     private void constructBatchIndexedFromConsensusRequest(

Reply via email to