This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 2f46f2a8da8 Procedure: Fix the concurrency error during 
StateMachineProcedure snapshot (#15417) (#15423)
2f46f2a8da8 is described below

commit 2f46f2a8da8f17b949483ffa513de60fe5f63ad5
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Apr 28 16:10:11 2025 +0800

    Procedure: Fix the concurrency error during StateMachineProcedure snapshot 
(#15417) (#15423)
    
    This PR fixes a concurrency error during StateMachineProcedure snapshot 
serialization by replacing the non-thread-safe LinkedList with a thread-safe 
ConcurrentLinkedDeque and adding a defensive copy mechanism during 
serialization to ensure consistency.
    
    - Replaces LinkedList with ConcurrentLinkedDeque for states
    - Uses a defensive copy of states during serialization to prevent 
inconsistencies
    
    (cherry picked from commit 17da13e45d72f9b8bb0e97ce51aaa21e0fc6f8ef)
---
 .../confignode/procedure/impl/StateMachineProcedure.java      | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
index f0d629a9819..778c552bd56 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
@@ -32,8 +32,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 /**
  * Procedure described by a series of steps.
@@ -51,7 +51,7 @@ public abstract class StateMachineProcedure<Env, TState> 
extends Procedure<Env>
   private static final int EOF_STATE = Integer.MIN_VALUE;
 
   private Flow stateFlow = Flow.HAS_MORE_STATE;
-  private final LinkedList<Integer> states = new LinkedList<>();
+  private final ConcurrentLinkedDeque<Integer> states = new 
ConcurrentLinkedDeque<>();
 
   private final List<Procedure<?>> subProcList = new ArrayList<>();
 
@@ -271,8 +271,11 @@ public abstract class StateMachineProcedure<Env, TState> 
extends Procedure<Env>
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
-    stream.writeInt(states.size());
-    for (int state : states) {
+
+    // Ensure that the Size does not differ from the actual length during the 
reading process
+    final ArrayList<Integer> copyStates = new ArrayList<>(states);
+    stream.writeInt(copyStates.size());
+    for (int state : copyStates) {
       stream.writeInt(state);
     }
   }

Reply via email to