This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test_exp1_no_write
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0729_test_exp1_no_write by 
this push:
     new c70de09193 fix a potential issue
c70de09193 is described below

commit c70de09193de326a2ea205b6cdb546ee13cf2046
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Fri Aug 5 18:47:58 2022 +0800

    fix a potential issue
---
 .../consensus/multileader/MultiLeaderServerImpl.java |  9 ++++++++-
 .../multileader/logdispatcher/LogDispatcher.java     | 20 +++++++++++++++-----
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 973a31272d..791be89917 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -130,7 +130,10 @@ public class MultiLeaderServerImpl {
         StepTracker.trace("stateMachineWrite", startTimeAfterLock, 
System.nanoTime());
         long offerStartTime = System.nanoTime();
         if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          logDispatcher.offer(indexedConsensusRequest);
+          synchronized (index) {
+            logDispatcher.offer(indexedConsensusRequest);
+            index.incrementAndGet();
+          }
         } else {
           logger.debug(
               "{}: write operation failed. searchIndex: {}. Code: {}",
@@ -230,4 +233,8 @@ public class MultiLeaderServerImpl {
   public MultiLeaderConfig getConfig() {
     return config;
   }
+
+  public AtomicLong getIndexObject() {
+    return index;
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 1ec4dbd490..ac941e6453 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -266,6 +266,7 @@ public class LogDispatcher {
         PendingBatch batch;
         List<TLogBatch> logBatches = new ArrayList<>();
         long startIndex = syncStatus.getNextSendingIndex();
+        long maxIndexWhenBufferedRequestEmpty = startIndex;
         logger.debug("[GetBatch] startIndex: {}", startIndex);
         long endIndex;
         if (bufferedRequest.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
@@ -275,9 +276,12 @@ public class LogDispatcher {
               impl.getThisNode().getGroupId(),
               pendingRequest.size(),
               bufferedRequest.size());
-          pendingRequest.drainTo(
-              bufferedRequest,
-              config.getReplication().getMaxRequestPerBatch() - 
bufferedRequest.size());
+          synchronized (impl.getIndexObject()) {
+            pendingRequest.drainTo(
+                bufferedRequest,
+                config.getReplication().getMaxRequestPerBatch() - 
bufferedRequest.size());
+            maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
+          }
           // remove all request that searchIndex < startIndex
           Iterator<IndexedConsensusRequest> iterator = 
bufferedRequest.iterator();
           while (iterator.hasNext()) {
@@ -289,8 +293,13 @@ public class LogDispatcher {
             }
           }
         }
-        if (bufferedRequest.isEmpty()) { // only execute this after a restart
-          endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, 
logBatches);
+        // This condition will be executed in several scenarios:
+        // 1. restart
+        // 2. The getBatch() is invoked immediately at the moment the 
PendingRequests are consumed
+        // up.
+        if (bufferedRequest.isEmpty()) {
+          endIndex =
+              constructBatchFromWAL(startIndex, 
maxIndexWhenBufferedRequestEmpty, logBatches);
           batch = new PendingBatch(startIndex, endIndex, logBatches);
           logger.debug(
               "{} : accumulated a {} from wal when empty", 
impl.getThisNode().getGroupId(), batch);
@@ -376,6 +385,7 @@ public class LogDispatcher {
               currentIndex,
               maxIndex,
               iteratorIndex));
+      // Even if there is no WAL files, these code won't produce error.
       if (iteratorIndex != currentIndex) {
         walEntryiterator.skipTo(currentIndex);
         iteratorIndex = currentIndex;

Reply via email to