This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0e792a6190 IGNITE-24106 Propagate raft term and index to
onNewPeersConfigurationApplied (#4978)
0e792a6190 is described below
commit 0e792a6190baf09f2ae99a09b63b30a7ceed9f65
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Dec 30 18:27:44 2024 +0400
IGNITE-24106 Propagate raft term and index to
onNewPeersConfigurationApplied (#4978)
---
.../RebalanceRaftGroupEventsListener.java | 2 +-
.../ZoneRebalanceRaftGroupEventsListener.java | 2 +-
.../internal/raft/RaftGroupEventsListener.java | 4 +-
.../internal/raft/ItRaftGroupServiceTest.java | 3 +-
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 123 ++++++++++++++++++++-
.../apache/ignite/raft/jraft/core/TestCluster.java | 2 +-
.../internal/raft/JraftGroupEventsListener.java | 4 +-
.../impl/RaftGroupEventsListenerAdapter.java | 4 +-
.../apache/ignite/raft/jraft/core/NodeImpl.java | 8 +-
9 files changed, 137 insertions(+), 15 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index e68c0b89f3..991cd5e2a3 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -164,7 +164,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
/** {@inheritDoc} */
@Override
- public void onNewPeersConfigurationApplied(PeersAndLearners configuration)
{
+ public void onNewPeersConfigurationApplied(PeersAndLearners configuration,
long term, long index) {
if (!busyLock.enterBusy()) {
return;
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index dc844e77c4..4e11970984 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -214,7 +214,7 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
/** {@inheritDoc} */
@Override
- public void onNewPeersConfigurationApplied(PeersAndLearners configuration)
{
+ public void onNewPeersConfigurationApplied(PeersAndLearners configuration,
long term, long index) {
if (!busyLock.enterBusy()) {
return;
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
index fe9be4c453..6bc505cac3 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
@@ -32,8 +32,10 @@ public interface RaftGroupEventsListener {
* Invoked on the leader, when new peers' configuration applied to raft
group.
*
* @param configuration New Raft group configuration.
+ * @param term Term on which the new configuration was applied.
+ * @param index Index on which the new configuration was applied.
*/
- default void onNewPeersConfigurationApplied(PeersAndLearners
configuration) {}
+ default void onNewPeersConfigurationApplied(PeersAndLearners
configuration, long term, long index) {}
/**
* Invoked on the leader if membership reconfiguration failed, because of
{@link Status}.
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index e17571e88a..d9d2f9c9c5 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -30,6 +30,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -177,7 +178,7 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
configurationComplete.countDown();
return null;
- }).when(eventsListener).onNewPeersConfigurationApplied(any());
+ }).when(eventsListener).onNewPeersConfigurationApplied(any(),
anyLong(), anyLong());
RaftGroupService service0 = nodes.get(0).raftGroupService;
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index e0e0cee115..1453413be8 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -45,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
@@ -82,6 +83,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
@@ -118,6 +120,7 @@ import
org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.closure.TaskClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.core.FSMCallerImpl.ApplyTask;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
@@ -3353,7 +3356,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
@Test
public void testOnReconfigurationErrorListener() throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
- cluster = new TestCluster("testChangePeers", dataPath,
Collections.singletonList(peer0), testInfo);
+ cluster = new TestCluster("testOnReconfigurationErrorListener",
dataPath, Collections.singletonList(peer0), testInfo);
var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
@@ -3363,7 +3366,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
- verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any());
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
PeerId newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT +
1).getPeerId();
@@ -3383,7 +3386,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
@Test
public void testNewPeersConfigurationAppliedListener() throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
- cluster = new TestCluster("testChangePeers", dataPath,
Collections.singletonList(peer0), testInfo);
+ cluster = new TestCluster("testNewPeersConfigurationAppliedListener",
dataPath, Collections.singletonList(peer0), testInfo);
var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
@@ -3409,7 +3412,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
learners.add(learner);
}
- verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any());
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
// Wait until every node sees every other node, otherwise
// changePeersAndLearnersAsync can fail.
@@ -3433,10 +3436,120 @@ public class ItNodeTest extends BaseIgniteAbstractTest
{
return false;
}, 10_000));
- verify(raftGrpEvtsLsnr,
times(1)).onNewPeersConfigurationApplied(List.of(newPeer), List.of(newLearner));
+ verify(raftGrpEvtsLsnr,
times(1)).onNewPeersConfigurationApplied(eq(List.of(newPeer)),
eq(List.of(newLearner)), anyLong(), anyLong());
}
}
+ @Test
+ public void
testIndexAndTermArePropagatedToOnNewPeersConfigurationApplied() throws
Exception {
+ TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
+ var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
+
+ AtomicLong term = new AtomicLong(-1);
+ AtomicLong index = new AtomicLong(-1);
+
+ cluster = new TestCluster(
+
"testIndexAndTermArePropagatedToOnNewPeersConfigurationApplied",
+ dataPath,
+ Collections.singletonList(peer0),
+ new LinkedHashSet<>(),
+ ELECTION_TIMEOUT_MILLIS,
+ (peerId, opts) -> {
+ opts.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
+ opts.setFsm(new MockStateMachine(peerId) {
+ @Override
+ public void
onRawConfigurationCommitted(ConfigurationEntry conf) {
+ term.set(conf.getId().getTerm());
+ index.set(conf.getId().getIndex());
+ super.onRawConfigurationCommitted(conf);
+ }
+ });
+ },
+ testInfo
+ );
+
+ assertTrue(cluster.start(peer0));
+
+ Node leader = cluster.waitAndGetLeader();
+
+ assertEquals(1, term.get());
+ assertEquals(1, index.get());
+
+ TestPeer newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
+ assertTrue(cluster.start(newPeer, false, 300));
+
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
+
+ // Wait until new node node sees every other node, otherwise
+ // changePeersAndLearnersAsync can fail.
+ waitForTopologyOnEveryNode(1, cluster);
+
+ SynchronizedClosure done = new SynchronizedClosure();
+ leader.changePeersAndLearnersAsync(
+ new Configuration(List.of(peer0.getPeerId(),
newPeer.getPeerId()), List.of()), leader.getCurrentTerm(),
+ done
+ );
+
+ assertEquals(done.await(), Status.OK());
+
+ assertTrue(waitForCondition(() ->
cluster.getLeader().listAlivePeers().contains(newPeer.getPeerId()), 10_000));
+
+ // Leader hasn't been changed, term must stay the same
+ assertEquals(1, term.get());
+ // idx_2 == joint consensus, idx_3 is expected final cfg
+ assertEquals(3, index.get());
+
+ verify(
+ raftGrpEvtsLsnr,
+
times(1)).onNewPeersConfigurationApplied(List.of(peer0.getPeerId(),
newPeer.getPeerId()), List.of(), term.get(), index.get()
+ );
+ }
+
+ @Test
+ public void testOnNewPeersConfigurationAppliedIsNotCalledAfterResetPeers()
throws Exception {
+ TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
+ var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
+
+ cluster = new TestCluster(
+ "testOnNewPeersConfigurationAppliedIsNotCalledAfterResetPeers",
+ dataPath,
+ Collections.singletonList(peer0),
+ new LinkedHashSet<>(),
+ ELECTION_TIMEOUT_MILLIS,
+ (peerId, opts) -> {
+ opts.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
+ },
+ testInfo
+ );
+
+ assertTrue(cluster.start(peer0));
+
+ Node leader = cluster.waitAndGetLeader();
+
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
+
+ assertEquals(1, leader.getCurrentTerm());
+
+ TestPeer fakePeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
+
+ leader.resetPeers(new Configuration(List.of(fakePeer.getPeerId())));
+
+ leader.resetPeers(new Configuration(List.of(peer0.getPeerId())));
+
+ // Term was changed twice because of two reset peers
+ assertTrue(waitForCondition(() -> leader.getCurrentTerm() == 3,
10_000));
+
+ assertTrue(waitForCondition(() -> {
+ if (cluster.getLeader() != null) {
+ return
peer0.getPeerId().equals(cluster.getLeader().getLeaderId());
+ }
+ return false;
+
+ }, 10_000));
+
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
+ }
+
@Test
public void testChangePeersAndLearnersOnLeaderElected() throws Exception {
List<TestPeer> peers = IntStream.range(0, 6)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 8f42635714..1e88427040 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -299,7 +299,7 @@ public class TestCluster {
Node node = server.start();
- this.fsms.put(peer.getPeerId(), fsm);
+ this.fsms.put(peer.getPeerId(), (MockStateMachine)
nodeOptions.getFsm());
this.nodes.add((NodeImpl) node);
return true;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
index 7f5cf736f3..dd12e1583d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
@@ -37,8 +37,10 @@ public interface JraftGroupEventsListener {
*
* @param peers Collection of peers, which was applied by raft group
membership configuration.
* @param learners Collection of learners, which was applied by raft group
membership configuration.
+ * @param term Term on which the new configuration was applied.
+ * @param index Index on which the new configuration was applied.
*/
- void onNewPeersConfigurationApplied(Collection<PeerId> peers,
Collection<PeerId> learners);
+ void onNewPeersConfigurationApplied(Collection<PeerId> peers,
Collection<PeerId> learners, long term, long index);
/**
* Invoked on the leader if membership reconfiguration failed, because of
{@link Status}.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
index 3861c9365a..b3d706045f 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
@@ -69,8 +69,8 @@ class RaftGroupEventsListenerAdapter implements
JraftGroupEventsListener {
}
@Override
- public void onNewPeersConfigurationApplied(Collection<PeerId> peerIds,
Collection<PeerId> learnerIds) {
- delegate.onNewPeersConfigurationApplied(configuration(peerIds,
learnerIds));
+ public void onNewPeersConfigurationApplied(Collection<PeerId> peerIds,
Collection<PeerId> learnerIds, long term, long index) {
+ delegate.onNewPeersConfigurationApplied(configuration(peerIds,
learnerIds), term, index);
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 411336efe9..ba59260b71 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -493,7 +493,9 @@ public class NodeImpl implements Node, RaftServerService {
if (listener != null) {
if (status.isOk()) {
-
listener.onNewPeersConfigurationApplied(resultPeerIds, resultLearnerIds);
+ LogId id = node.conf.getId();
+
+
listener.onNewPeersConfigurationApplied(resultPeerIds, resultLearnerIds,
id.getTerm(), id.getIndex());
} else {
listener.onReconfigurationError(status,
resultPeerIds, resultLearnerIds, node.getCurrentTerm());
}
@@ -2766,7 +2768,9 @@ public class NodeImpl implements Node, RaftServerService {
JraftGroupEventsListener listener =
this.getOptions().getRaftGrpEvtsLsnr();
if (listener != null) {
-
listener.onNewPeersConfigurationApplied(newConf.getPeers(),
newConf.getLearners());
+ LogId id = this.conf.getId();
+
+
listener.onNewPeersConfigurationApplied(newConf.getPeers(),
newConf.getLearners(), id.getTerm(), id.getIndex());
}
done.run(status);