This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 48752dd4848acc22129e4fb1b31d17114e1c10be
Author: Potato <[email protected]>
AuthorDate: Thu Aug 1 10:37:35 2024 +0800

    Fixed concurrency bug in IoTConsensus Region migration #13070
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    (cherry picked from commit 4c0a073a4365a312c5c745ac3a6ce9279fb1122f)
---
 .../consensus/iot/logdispatcher/SyncStatus.java    | 60 ++++++++++------------
 1 file changed, 27 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index 17c232a294b..e11b6302114 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -43,14 +43,12 @@ public class SyncStatus {
    *
    * @throws InterruptedException
    */
-  public void addNextBatch(Batch batch) throws InterruptedException {
-    synchronized (this) {
-      while (pendingBatches.size() >= 
config.getReplication().getMaxPendingBatchesNum()
-          || !iotConsensusMemoryManager.reserve(batch.getSerializedSize(), 
false)) {
-        wait();
-      }
-      pendingBatches.add(batch);
+  public synchronized void addNextBatch(Batch batch) throws 
InterruptedException {
+    while (pendingBatches.size() >= 
config.getReplication().getMaxPendingBatchesNum()
+        || !iotConsensusMemoryManager.reserve(batch.getSerializedSize(), 
false)) {
+      wait();
     }
+    pendingBatches.add(batch);
   }
 
   /**
@@ -58,29 +56,27 @@ public class SyncStatus {
    * batch in the parameter is actually in pendingBatches, rather than a 
reference to a different
    * object with equal data, so we do not inherit method equals for Batch
    */
-  public void removeBatch(Batch batch) {
-    synchronized (this) {
-      batch.setSynced(true);
-      if (!pendingBatches.isEmpty() && pendingBatches.get(0).equals(batch)) {
-        Iterator<Batch> iterator = pendingBatches.iterator();
-        Batch current = iterator.next();
-        while (current.isSynced()) {
-          controller.update(current.getEndIndex(), false);
-          iterator.remove();
-          iotConsensusMemoryManager.free(current.getSerializedSize(), false);
-          if (iterator.hasNext()) {
-            current = iterator.next();
-          } else {
-            break;
-          }
+  public synchronized void removeBatch(Batch batch) {
+    batch.setSynced(true);
+    if (!pendingBatches.isEmpty() && pendingBatches.get(0).equals(batch)) {
+      Iterator<Batch> iterator = pendingBatches.iterator();
+      Batch current = iterator.next();
+      while (current.isSynced()) {
+        controller.update(current.getEndIndex(), false);
+        iterator.remove();
+        iotConsensusMemoryManager.free(current.getSerializedSize(), false);
+        if (iterator.hasNext()) {
+          current = iterator.next();
+        } else {
+          break;
         }
-        // wake up logDispatcherThread that might be blocked
-        notifyAll();
       }
+      // wake up logDispatcherThread that might be blocked
+      notifyAll();
     }
   }
 
-  public void free() {
+  public synchronized void free() {
     long size = 0;
     for (Batch pendingBatch : pendingBatches) {
       size += pendingBatch.getSerializedSize();
@@ -91,17 +87,15 @@ public class SyncStatus {
   }
 
   /** Gets the first index that is not currently synchronized. */
-  public long getNextSendingIndex() {
+  public synchronized long getNextSendingIndex() {
     // we do not use ReentrantReadWriteLock because there will be only one 
thread reading this field
-    synchronized (this) {
-      return 1
-          + (pendingBatches.isEmpty()
-              ? controller.getCurrentIndex()
-              : pendingBatches.getLast().getEndIndex());
-    }
+    return 1
+        + (pendingBatches.isEmpty()
+            ? controller.getCurrentIndex()
+            : pendingBatches.getLast().getEndIndex());
   }
 
-  public List<Batch> getPendingBatches() {
+  public synchronized List<Batch> getPendingBatches() {
     return pendingBatches;
   }
 }

Reply via email to