This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5a89744ccde IGNITE-26809 Properly check RAFT state (#6836)
5a89744ccde is described below
commit 5a89744ccdebdff3fd7c927794a205ef79a54286
Author: Cyrill <[email protected]>
AuthorDate: Fri Oct 24 11:44:50 2025 +0300
IGNITE-26809 Properly check RAFT state (#6836)
Co-authored-by: Kirill Sizov <[email protected]>
---
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 102 +++++++++++++++++++++
.../apache/ignite/raft/jraft/core/NodeImpl.java | 7 ++
2 files changed, 109 insertions(+)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 420ab3aff95..c22dffee075 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -21,6 +21,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.synchronizedList;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static
org.apache.ignite.internal.util.IgniteUtils.byteBufferToByteArray;
import static
org.apache.ignite.raft.jraft.core.TestCluster.ELECTION_TIMEOUT_MILLIS;
import static org.apache.ignite.raft.jraft.test.TestUtils.sender;
@@ -58,6 +59,8 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.io.File;
import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.Proxy;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -148,6 +151,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcServer;
import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
@@ -681,6 +685,104 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
cluster.ensureSame();
}
+ /**
+ * Tests that election step down race condition is handled correctly.
+ *
+ * The race is between NodeImpl.electSelf that first releases and then
reacquires the same lock
+ * and VoteTimer that manages to fire while the lock is not acquired.
+ *
+ * First we start a single node but add more than one peers to the
configuration to
+ * prevent calling electSelf right from init.
+ * Then we inject our custom LogManager to be able to pause its calls.
+ * Then we call tryElectSelf() to trigger the race condition.
+ *
+ * We pause the execution for 10 seconds, but that's totally fine since
vote timeout is 1.2 sec
+ * and the thread will be interrupted anyway by vote timeout.
+ * We pause only the first call since only the first call triggers the
issue and
+ * we expect the other calls to pass normally.
+ *
+ */
+ @Test
+ public void testElectionStepDownRaceOnInit() throws Exception {
+ TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
+ // Create another peer to prevent NodeImpl.init from calling
electSelf() at the end.
+ TestPeer peer2 = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
+
+ NodeOptions nodeOptions = createNodeOptions(0);
+ MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
+ nodeOptions.setFsm(fsm);
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
+ nodeOptions.setInitialConf(new Configuration(asList(peer.getPeerId(),
peer2.getPeerId())));
+
+ RaftGroupService service = createService("unittest", peer,
nodeOptions, List.of());
+
+ Node node = service.start();
+
+ NodeImpl nodeImpl = (NodeImpl) node;
+
+ // Make sure we block only the first call to getLastLogId - which we
are interested in.
+ AtomicBoolean block = new AtomicBoolean();
+ interceptLogManager(nodeImpl, () -> {
+ if (block.compareAndSet(false, true)) {
+ try {
+ Thread.sleep(10_000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.info("Thread interrupted");
+ }
+ }
+ });
+
+ // Now revert to the original correct configuration with a single node
to be able to proceed with election.
+ // nodeOptions.getInitialConf() is used by reference, so we can change
the internal state this way.
+ nodeOptions.getInitialConf().setPeers(asList(peer.getPeerId()));
+
+ nodeImpl.tryElectSelf();
+
+ assertEquals(1, node.listPeers().size());
+ assertTrue(node.listPeers().contains(peer.getPeerId()));
+
+ assertTrue(waitForCondition(node::isLeader, 15_000));
+
+ sendTestTaskAndWait(node);
+ assertEquals(10, fsm.getLogs().size());
+ int i = 0;
+ for (ByteBuffer data : fsm.getLogs()) {
+ assertEquals("hello" + i++, stringFromBytes(data.array()));
+ }
+ }
+
+ /**
+ * Intercepts the LogManager.getLastLogId call.
+ */
+ private static void interceptLogManager(NodeImpl nodeImpl, Runnable
onInterceptedAction) {
+ // Intercept getLastLogId call to simulate the race condition.
+ try {
+ Field logManagerField =
NodeImpl.class.getDeclaredField("logManager");
+
+ logManagerField.setAccessible(true);
+ LogManager originalLogManager = (LogManager)
logManagerField.get(nodeImpl);
+
+ LogManager interceptedLogManager = (LogManager)
Proxy.newProxyInstance(
+ originalLogManager.getClass().getClassLoader(),
+ new Class<?>[]{LogManager.class},
+ (proxy, method, args) -> {
+ if ("getLastLogId".equals(method.getName()) && args !=
null && args.length == 1) {
+ // Signal that election thread is paused (lock is
released at this point)
+ log.info("Intercepted the call to
logManager.getLastLogId()");
+ onInterceptedAction.run();
+ }
+
+ return method.invoke(originalLogManager, args);
+ });
+
+ logManagerField.set(nodeImpl, interceptedLogManager);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
class UserReplicatorStateListener implements
Replicator.ReplicatorStateListener {
/** {@inheritDoc} */
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 5824a2bc0e5..007fdbe3d7b 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1443,6 +1443,13 @@ public class NodeImpl implements Node, RaftServerService
{
LOG.warn("Node {} raise term {} when getLastLogId.",
getNodeId(), this.currTerm);
return;
}
+
+ // Check if state changed from CANDIDATE during lock release.
+ if (this.state != State.STATE_CANDIDATE) {
+ LOG.warn("Node {} state changed from CANDIDATE to {} during
election.", getNodeId(), this.state);
+ return;
+ }
+
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;