This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2f46f2a8da8 Procedure: Fix the concurrency error during
StateMachineProcedure snapshot (#15417) (#15423)
2f46f2a8da8 is described below
commit 2f46f2a8da8f17b949483ffa513de60fe5f63ad5
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Apr 28 16:10:11 2025 +0800
Procedure: Fix the concurrency error during StateMachineProcedure snapshot
(#15417) (#15423)
This PR fixes a concurrency error during StateMachineProcedure snapshot
serialization by replacing the non-thread-safe LinkedList with a thread-safe
ConcurrentLinkedDeque and adding a defensive copy mechanism during
serialization to ensure consistency.
- Replaces LinkedList with ConcurrentLinkedDeque for states
- Uses a defensive copy of states during serialization to prevent
inconsistencies
(cherry picked from commit 17da13e45d72f9b8bb0e97ce51aaa21e0fc6f8ef)
---
.../confignode/procedure/impl/StateMachineProcedure.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
index f0d629a9819..778c552bd56 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
@@ -32,8 +32,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
/**
* Procedure described by a series of steps.
@@ -51,7 +51,7 @@ public abstract class StateMachineProcedure<Env, TState>
extends Procedure<Env>
private static final int EOF_STATE = Integer.MIN_VALUE;
private Flow stateFlow = Flow.HAS_MORE_STATE;
- private final LinkedList<Integer> states = new LinkedList<>();
+ private final ConcurrentLinkedDeque<Integer> states = new
ConcurrentLinkedDeque<>();
private final List<Procedure<?>> subProcList = new ArrayList<>();
@@ -271,8 +271,11 @@ public abstract class StateMachineProcedure<Env, TState>
extends Procedure<Env>
@Override
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
- stream.writeInt(states.size());
- for (int state : states) {
+
+ // Ensure that the Size does not differ from the actual length during the
reading process
+ final ArrayList<Integer> copyStates = new ArrayList<>(states);
+ stream.writeInt(copyStates.size());
+ for (int state : copyStates) {
stream.writeInt(state);
}
}