This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 09816a883f IGNITE-22261 Fix deadlock on configuration application in
NodeImpl when disruptors are full (#4066)
09816a883f is described below
commit 09816a883f4320c1631fa24188ac4fcffb3e2268
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Mon Aug 5 13:31:17 2024 +0300
IGNITE-22261 Fix deadlock on configuration application in NodeImpl when
disruptors are full (#4066)
---
modules/raft/build.gradle | 1 +
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 102 ++++++++++++++++++++-
.../apache/ignite/raft/jraft/core/BallotBox.java | 10 +-
.../apache/ignite/raft/jraft/core/NodeImpl.java | 52 +++++++++--
.../ignite/raft/jraft/option/BallotBoxOptions.java | 9 ++
.../raft/jraft/storage/SnapshotExecutor.java | 6 ++
.../storage/snapshot/SnapshotExecutorImpl.java | 5 +-
.../ignite/raft/jraft/core/BallotBoxTest.java | 67 ++++++++------
8 files changed, 202 insertions(+), 50 deletions(-)
diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index 1d99e218f3..8f3c2fcbad 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -98,4 +98,5 @@ dependencies {
integrationTestImplementation project(':ignite-failure-handler')
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.dropwizard.metrics
+ integrationTestImplementation libs.disruptor
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 97902de1ec..1189b04328 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -45,6 +45,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.codahale.metrics.ConsoleReporter;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Files;
@@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.stream.IntStream;
@@ -98,7 +101,10 @@ import
org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.closure.TaskClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl.ApplyTask;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.Task;
import org.apache.ignite.raft.jraft.entity.UserLog;
@@ -257,7 +263,101 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-22261")
+ public void testSmallestBufferSize() throws Exception {
+ TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
+
+ NodeOptions nodeOptions = createNodeOptions(0);
+ RaftOptions raftOptions = new RaftOptions();
+ raftOptions.setDisruptorBufferSize(1);
+ nodeOptions.setRaftOptions(raftOptions);
+ MockStateMachine fsm = new MockStateMachine(peer.getPeerId());
+ nodeOptions.setFsm(fsm);
+ nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta");
+ nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
+ nodeOptions.setInitialConf(new
Configuration(Collections.singletonList(peer.getPeerId())));
+
+ AtomicBoolean block = new AtomicBoolean();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ nodeOptions.setfSMCallerExecutorDisruptor(new StripedDisruptor<>(
+ "unit-test",
+ "JRaft-FSMCaller-Disruptor",
+ 1,
+ () -> new ApplyTask(),
+ 1,
+ false,
+ false,
+ null
+ ) {
+ @Override
+ public RingBuffer<ApplyTask> subscribe(
+ NodeId group,
+ EventHandler<ApplyTask> handler,
+ BiConsumer<ApplyTask, Throwable> exceptionHandler
+ ) {
+ return super.subscribe(group, (event, sequence, endOfBatch) ->
{
+ if (block.compareAndSet(true, false)) {
+ log.info("Raft task is blocked.");
+
+ latch.await();
+
+ log.info("Raft task is continue executing.");
+ }
+
+ handler.onEvent(event, sequence, endOfBatch);
+ }, exceptionHandler);
+ }
+ });
+
+ RaftGroupService service = createService("unittest", peer,
nodeOptions, List.of());
+
+ Node node = service.start();
+
+ assertEquals(1, node.listPeers().size());
+ assertTrue(node.listPeers().contains(peer.getPeerId()));
+ assertTrue(waitForCondition(node::isLeader, 10_000));
+
+ AtomicInteger c = new AtomicInteger();
+
+ Task task1 = new Task(ByteBuffer.wrap(("Test task").getBytes(UTF_8)),
new JoinableClosure(status -> {
+ log.info("First task is started.");
+
+ if (!status.isOk()) {
+ assertTrue(
+ status.getRaftError() == RaftError.EBUSY ||
status.getRaftError() == RaftError.EPERM);
+ }
+ c.incrementAndGet();
+ }));
+
+ Task task2 = new Task(ByteBuffer.wrap(("Test task").getBytes(UTF_8)),
new JoinableClosure(status -> {
+ log.info("Second task is started.");
+
+ if (!status.isOk()) {
+ assertTrue(
+ status.getRaftError() == RaftError.EBUSY ||
status.getRaftError() == RaftError.EPERM);
+ }
+ c.incrementAndGet();
+ }));
+
+ try {
+ block.set(true);
+
+ node.apply(task1);
+
+ assertTrue(waitForCondition(() -> !block.get(), 10_000));
+
+ node.apply(task2);
+
+ latch.countDown();
+
+ Task.joinAll(List.of(task1, task2), TimeUnit.SECONDS.toMillis(30));
+ assertEquals(2, c.get());
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Test
public void testNodeTaskOverload() throws Exception {
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
index 0c5d809e00..ce4ce3a3ea 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/BallotBox.java
@@ -79,6 +79,7 @@ public class BallotBox implements
Lifecycle<BallotBoxOptions>, Describer {
}
this.waiter = opts.getWaiter();
this.closureQueue = opts.getClosureQueue();
+ this.lastCommittedIndex = opts.getLastCommittedIndex();
return true;
}
@@ -159,10 +160,9 @@ public class BallotBox implements
Lifecycle<BallotBoxOptions>, Describer {
* |newPendingIndex| should be |last_log_index| + 1.
*
* @param newPendingIndex pending index of new leader
- * @param quorum quorum size
* @return returns true if reset success
*/
- public boolean resetPendingIndex(final long newPendingIndex, final int
quorum) {
+ public boolean resetPendingIndex(final long newPendingIndex) {
final long stamp = this.stampedLock.writeLock();
try {
if (!(this.pendingIndex == 0 && this.pendingMetaQueue.isEmpty())) {
@@ -176,12 +176,6 @@ public class BallotBox implements
Lifecycle<BallotBoxOptions>, Describer {
return false;
}
this.pendingIndex = newPendingIndex;
- if (quorum == 1) {
- // It is safe to initiate lastCommittedIndex as last log one
because in case of single peer no one will discard
- // log records on leader election. It's not an optimisation,
but a matter of correctness because otherwise there will be
- // a race between readIndex evaluation and asynchronous log
records application on node restart.
- this.lastCommittedIndex = newPendingIndex - 1;
- }
this.closureQueue.resetFirstIndex(newPendingIndex);
return true;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index c4b498cee1..0fcf832981 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1022,14 +1022,6 @@ public class NodeImpl implements Node, RaftServerService
{
LOG.error("Node {} initFSMCaller failed.", getNodeId());
return false;
}
- this.ballotBox = new BallotBox();
- final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
- ballotBoxOpts.setWaiter(this.fsmCaller);
- ballotBoxOpts.setClosureQueue(this.closureQueue);
- if (!this.ballotBox.init(ballotBoxOpts)) {
- LOG.error("Node {} init ballotBox failed.", getNodeId());
- return false;
- }
if (!initSnapshotStorage()) {
LOG.error("Node {} initSnapshotStorage failed.", getNodeId());
@@ -1053,6 +1045,12 @@ public class NodeImpl implements Node, RaftServerService
{
this.targetPriority =
getMaxPriorityOfNodes(this.conf.getConf().getPeers());
}
+ // It must be initialized after initializing conf and log storage.
+ if (!initBallotBox()) {
+ LOG.error("Node {} init ballotBox failed.", getNodeId());
+ return false;
+ }
+
if (!this.conf.isEmpty()) {
Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s",
this.conf);
}
@@ -1127,6 +1125,30 @@ public class NodeImpl implements Node, RaftServerService
{
return true;
}
+ private boolean initBallotBox() {
+ this.ballotBox = new BallotBox();
+ final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
+ ballotBoxOpts.setWaiter(this.fsmCaller);
+ ballotBoxOpts.setClosureQueue(this.closureQueue);
+ // TODO: uncomment when backport related change
https://issues.apache.org/jira/browse/IGNITE-22923
+ //ballotBoxOpts.setNodeId(getNodeId());
+ // Try to initialize the last committed index in BallotBox to be the
last snapshot index.
+ long lastCommittedIndex = 0;
+ if (this.snapshotExecutor != null) {
+ lastCommittedIndex = this.snapshotExecutor.getLastSnapshotIndex();
+ }
+ if (this.getQuorum() == 1) {
+ // It is safe to initiate lastCommittedIndex as last log one
because in case of single peer no one will discard
+ // log records on leader election.
+ // Fix https://github.com/sofastack/sofa-jraft/issues/1049
+ lastCommittedIndex = Math.max(lastCommittedIndex,
this.logManager.getLastLogIndex());
+ }
+
+ ballotBoxOpts.setLastCommittedIndex(lastCommittedIndex);
+ LOG.info("Node {} init ballot box's lastCommittedIndex={}.",
getNodeId(), lastCommittedIndex);
+ return this.ballotBox.init(ballotBoxOpts);
+ }
+
/**
* Validates a required option if shared pools are enabled.
*
@@ -1451,7 +1473,7 @@ public class NodeImpl implements Node, RaftServerService {
}
// init commit manager
- this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() +
1, getQuorum());
+ this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() +
1);
// Register _conf_ctx to reject configuration changing before the
first log
// is committed.
if (this.confCtx.isBusy()) {
@@ -1561,6 +1583,18 @@ public class NodeImpl implements Node, RaftServerService
{
}
private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
+ if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
+ // It's overload, fail-fast
+ final List<Closure> dones = tasks.stream().map(ele ->
ele.done).filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ Utils.runInThread(this.getOptions().getCommonExecutor(), () -> {
+ for (final Closure done : dones) {
+ done.run(new Status(RaftError.EBUSY, "Node %s
log manager is busy.", this.getNodeId()));
+ }
+ });
+ return;
+ }
+
this.writeLock.lock();
try {
final int size = tasks.size();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java
index eea31def7d..02018fb74f 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/BallotBoxOptions.java
@@ -26,6 +26,15 @@ public class BallotBoxOptions {
private FSMCaller waiter;
private ClosureQueue closureQueue;
+ private long lastCommittedIndex;
+
+ public long getLastCommittedIndex() {
+ return lastCommittedIndex;
+ }
+
+ public void setLastCommittedIndex(long lastCommittedIndex) {
+ this.lastCommittedIndex = lastCommittedIndex;
+ }
public FSMCaller getWaiter() {
return this.waiter;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java
index d7688083d6..2040e85864 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutor.java
@@ -42,6 +42,12 @@ public interface SnapshotExecutor extends
Lifecycle<SnapshotExecutorOptions>, De
*/
void doSnapshot(final Closure done);
+ /**
+ * Returns the last snapshot index.
+ * @return
+ */
+ long getLastSnapshotIndex();
+
/**
* Install snapshot according to the very RPC from leader After the
installing succeeds (StateMachine is reset with
* the snapshot) or fails, done will be called to respond Errors: - Term
mismatches: which happens
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index b62a425cd3..f0db519aac 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -100,10 +100,7 @@ public class SnapshotExecutorImpl implements
SnapshotExecutor {
return this.lastSnapshotTerm;
}
- /**
- * Only for test
- */
- @OnlyForTest
+ @Override
public long getLastSnapshotIndex() {
return this.lastSnapshotIndex;
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
index f26f18181c..30dcfb76d5 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/BallotBoxTest.java
@@ -16,6 +16,12 @@
*/
package org.apache.ignite.raft.jraft.core;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.raft.jraft.Closure;
@@ -32,18 +38,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
@ExtendWith(MockitoExtension.class)
public class BallotBoxTest extends BaseIgniteAbstractTest {
private BallotBox box;
@@ -61,6 +59,7 @@ public class BallotBoxTest extends BaseIgniteAbstractTest {
this.closureQueue = new ClosureQueueImpl(options);
opts.setClosureQueue(this.closureQueue);
opts.setWaiter(this.waiter);
+ opts.setLastCommittedIndex(0);
box = new BallotBox();
assertTrue(box.init(opts));
}
@@ -71,19 +70,34 @@ public class BallotBoxTest extends BaseIgniteAbstractTest {
ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
}
- @ParameterizedTest
- @ValueSource(shorts = {0, 1, 3})
- public void testResetPendingIndex(int quorum) {
+ @Test
+ public void initWithLastCommittedIndex() {
+ BallotBoxOptions opts = new BallotBoxOptions();
+ NodeOptions options = new NodeOptions();
+ executor = JRaftUtils.createExecutor("test-executor-", Utils.cpus());
+ options.setCommonExecutor(executor);
+ this.closureQueue = new ClosureQueueImpl(options);
+ opts.setClosureQueue(this.closureQueue);
+ opts.setWaiter(this.waiter);
+ opts.setLastCommittedIndex(9);
+ box = new BallotBox();
+ assertTrue(box.init(opts));
+
+ assertEquals(box.getLastCommittedIndex(), 9);
+ }
+
+ @Test
+ public void testResetPendingIndex() {
assertEquals(0, closureQueue.getFirstIndex());
assertEquals(0, box.getPendingIndex());
- assertTrue(box.resetPendingIndex(1, quorum));
+ assertTrue(box.resetPendingIndex(1));
+ assertEquals(0, box.getLastCommittedIndex());
assertEquals(1, closureQueue.getFirstIndex());
assertEquals(1, box.getPendingIndex());
}
- @ParameterizedTest
- @ValueSource(shorts = {0, 1, 3})
- public void testAppendPendingTask(int quorum) {
+ @Test
+ public void testAppendPendingTask() {
assertTrue(this.box.getPendingMetaQueue().isEmpty());
assertTrue(this.closureQueue.getQueue().isEmpty());
assertFalse(this.box.appendPendingTask(
@@ -95,7 +109,7 @@ public class BallotBoxTest extends BaseIgniteAbstractTest {
}
}));
- assertTrue(box.resetPendingIndex(1, quorum));
+ assertTrue(box.resetPendingIndex(1));
assertTrue(this.box.appendPendingTask(
JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"),
JRaftUtils.getConfiguration("localhost:8081"), new Closure() {
@@ -110,21 +124,19 @@ public class BallotBoxTest extends BaseIgniteAbstractTest
{
assertEquals(1, this.closureQueue.getQueue().size());
}
- @ParameterizedTest
- @ValueSource(shorts = {0, 1, 3})
- public void testClearPendingTasks(int quorum) {
- testAppendPendingTask(quorum);
+ @Test
+ public void testClearPendingTasks() {
+ testAppendPendingTask();
this.box.clearPendingTasks();
assertTrue(this.box.getPendingMetaQueue().isEmpty());
assertTrue(this.closureQueue.getQueue().isEmpty());
assertEquals(0, closureQueue.getFirstIndex());
}
- @ParameterizedTest
- @ValueSource(shorts = {0, 1, 3})
- public void testCommitAt(int quorum) {
+ @Test
+ public void testCommitAt() {
assertFalse(this.box.commitAt(1, 3, new PeerId("localhost", 8081)));
- assertTrue(box.resetPendingIndex(1, quorum));
+ assertTrue(box.resetPendingIndex(1));
assertTrue(this.box.appendPendingTask(
JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083"),
JRaftUtils.getConfiguration("localhost:8081"), new Closure() {
@@ -150,10 +162,9 @@ public class BallotBoxTest extends BaseIgniteAbstractTest {
Mockito.verify(this.waiter, Mockito.only()).onCommitted(1);
}
- @ParameterizedTest
- @ValueSource(shorts = {0, 1, 3})
- public void testSetLastCommittedIndexHasPending(int quorum) {
- assertTrue(box.resetPendingIndex(1, quorum));
+ @Test
+ public void testSetLastCommittedIndexHasPending() {
+ assertTrue(box.resetPendingIndex(1));
assertThrows(IllegalArgumentException.class, () ->
this.box.setLastCommittedIndex(1));
}