This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f855c5b47d2 IoTConsensusV2: Fix some remain concurrent risks for 
stronger robustness. #15172
f855c5b47d2 is described below

commit f855c5b47d24d44fd43fe8520a5a6aadc086d315
Author: Peng Junzhi <[email protected]>
AuthorDate: Sun Mar 23 15:42:16 2025 +0800

    IoTConsensusV2: Fix some remain concurrent risks for stronger robustness. 
#15172
---
 .../iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java     | 3 +--
 .../db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java   | 4 ++--
 .../pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java   | 4 ++--
 3 files changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
index ce12e841cd9..ca6153d804a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
@@ -36,7 +36,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +44,7 @@ import java.util.stream.Collectors;
 
 public class ReplicateProgressDataNodeManager implements 
ReplicateProgressManager {
   private static final int DATA_NODE_ID = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
-  private static final Map<String, AtomicLong> groupId2ReplicateIndex = new 
HashMap<>();
+  private static final Map<String, AtomicLong> groupId2ReplicateIndex = new 
ConcurrentHashMap<>();
   private final Map<ConsensusGroupId, ProgressIndex> groupId2MaxProgressIndex;
   private final Map<ConsensusPipeName, Long> 
consensusPipe2pinnedCommitIndexForMigration;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index 6ff090475bc..d8dcfc83ab5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -39,10 +39,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -83,7 +83,7 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
   // Total size of this batch.
   private final AtomicInteger totalSize = new AtomicInteger(0);
   // All deletions that will be handled in a single persist task
-  private final List<DeletionResource> pendingDeletionsInOneTask = new 
ArrayList<>();
+  private final List<DeletionResource> pendingDeletionsInOneTask = new 
CopyOnWriteArrayList<>();
 
   // whether close method is called
   private volatile boolean isClosed = false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 9f683e11a1d..6d1a86eba34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -1331,8 +1331,8 @@ public class PipeConsensusReceiver {
     private final PipeConsensusTsFileWriterPool tsFileWriterPool;
     private final AtomicInteger WALEventCount = new AtomicInteger(0);
     private final AtomicInteger tsFileEventCount = new AtomicInteger(0);
-    private long onSyncedReplicateIndex = 0;
-    private int connectorRebootTimes = 0;
+    private volatile long onSyncedReplicateIndex = 0;
+    private volatile int connectorRebootTimes = 0;
 
     public RequestExecutor(
         PipeConsensusReceiverMetrics metric, PipeConsensusTsFileWriterPool 
tsFileWriterPool) {

Reply via email to