This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 48752dd4848acc22129e4fb1b31d17114e1c10be Author: Potato <[email protected]> AuthorDate: Thu Aug 1 10:37:35 2024 +0800 Fixed concurrency bug in IoTConsensus Region migration #13070 Signed-off-by: OneSizeFitQuorum <[email protected]> (cherry picked from commit 4c0a073a4365a312c5c745ac3a6ce9279fb1122f) --- .../consensus/iot/logdispatcher/SyncStatus.java | 60 ++++++++++------------ 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java index 17c232a294b..e11b6302114 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java @@ -43,14 +43,12 @@ public class SyncStatus { * * @throws InterruptedException */ - public void addNextBatch(Batch batch) throws InterruptedException { - synchronized (this) { - while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() - || !iotConsensusMemoryManager.reserve(batch.getSerializedSize(), false)) { - wait(); - } - pendingBatches.add(batch); + public synchronized void addNextBatch(Batch batch) throws InterruptedException { + while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() + || !iotConsensusMemoryManager.reserve(batch.getSerializedSize(), false)) { + wait(); } + pendingBatches.add(batch); } /** @@ -58,29 +56,27 @@ public class SyncStatus { * batch in the parameter is actually in pendingBatches, rather than a reference to a different * object with equal data, so we do not inherit method equals for Batch */ - public void removeBatch(Batch batch) { - synchronized (this) { - batch.setSynced(true); - if (!pendingBatches.isEmpty() && pendingBatches.get(0).equals(batch)) { - Iterator<Batch> iterator = pendingBatches.iterator(); - Batch current = iterator.next(); - while (current.isSynced()) { - controller.update(current.getEndIndex(), false); - iterator.remove(); - iotConsensusMemoryManager.free(current.getSerializedSize(), false); - if (iterator.hasNext()) { - current = iterator.next(); - } else { - break; - } + public synchronized void removeBatch(Batch batch) { + batch.setSynced(true); + if (!pendingBatches.isEmpty() && pendingBatches.get(0).equals(batch)) { + Iterator<Batch> iterator = pendingBatches.iterator(); + Batch current = iterator.next(); + while (current.isSynced()) { + controller.update(current.getEndIndex(), false); + iterator.remove(); + iotConsensusMemoryManager.free(current.getSerializedSize(), false); + if (iterator.hasNext()) { + current = iterator.next(); + } else { + break; } - // wake up logDispatcherThread that might be blocked - notifyAll(); } + // wake up logDispatcherThread that might be blocked + notifyAll(); } } - public void free() { + public synchronized void free() { long size = 0; for (Batch pendingBatch : pendingBatches) { size += pendingBatch.getSerializedSize(); @@ -91,17 +87,15 @@ public class SyncStatus { } /** Gets the first index that is not currently synchronized. */ - public long getNextSendingIndex() { + public synchronized long getNextSendingIndex() { // we do not use ReentrantReadWriteLock because there will be only one thread reading this field - synchronized (this) { - return 1 - + (pendingBatches.isEmpty() - ? controller.getCurrentIndex() - : pendingBatches.getLast().getEndIndex()); - } + return 1 + + (pendingBatches.isEmpty() + ? controller.getCurrentIndex() + : pendingBatches.getLast().getEndIndex()); } - public List<Batch> getPendingBatches() { + public synchronized List<Batch> getPendingBatches() { return pendingBatches; } }
