This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2f31d322c3c IGNITE-27085 Fix changePeersAndLearnersAsyncResponses test
(#7003)
2f31d322c3c is described below
commit 2f31d322c3ce81042e28e9ade96a6ba4b7459ce1
Author: Cyrill <[email protected]>
AuthorDate: Wed Nov 19 10:04:37 2025 +0300
IGNITE-27085 Fix changePeersAndLearnersAsyncResponses test (#7003)
Co-authored-by: Kirill Sizov <[email protected]>
---
.../RebalanceRaftGroupEventsListener.java | 2 +-
.../ZoneRebalanceRaftGroupEventsListener.java | 2 +-
.../internal/raft/RaftGroupEventsListener.java | 3 +-
.../internal/raft/ItRaftGroupServiceTest.java | 2 +-
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 70 ++++++++++++++++++----
.../internal/raft/JraftGroupEventsListener.java | 3 +-
.../impl/RaftGroupEventsListenerAdapter.java | 10 +++-
.../apache/ignite/raft/jraft/core/NodeImpl.java | 16 ++++-
8 files changed, 87 insertions(+), 21 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index 87674c2504c..0361ad5e92c 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -181,7 +181,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
/** {@inheritDoc} */
@Override
- public void onNewPeersConfigurationApplied(PeersAndLearners configuration,
long term, long index) {
+ public void onNewPeersConfigurationApplied(PeersAndLearners configuration,
long term, long index, long sequenceToken) {
if (!busyLock.enterBusy()) {
return;
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index 961f29b44b3..a1651dbd83d 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -291,7 +291,7 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
/** {@inheritDoc} */
@Override
- public void onNewPeersConfigurationApplied(PeersAndLearners configuration,
long term, long index) {
+ public void onNewPeersConfigurationApplied(PeersAndLearners configuration,
long term, long index, long sequenceToken) {
if (!busyLock.enterBusy()) {
return;
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
index 41615773785..c1bc33be35e 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
@@ -44,8 +44,9 @@ public interface RaftGroupEventsListener {
* @param configuration New Raft group configuration.
* @param term Term on which the new configuration was applied.
* @param index Index on which the new configuration was applied.
+ * @param sequenceToken Sequence token of this change.
*/
- default void onNewPeersConfigurationApplied(PeersAndLearners
configuration, long term, long index) {}
+ default void onNewPeersConfigurationApplied(PeersAndLearners
configuration, long term, long index, long sequenceToken) {}
/**
* Invoked on the leader if membership reconfiguration failed, because of
{@link Status}.
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index d87f9dec1e6..f5d302a0c8c 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -193,7 +193,7 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
configurationComplete.countDown();
return null;
- }).when(eventsListener).onNewPeersConfigurationApplied(any(),
anyLong(), anyLong());
+ }).when(eventsListener).onNewPeersConfigurationApplied(any(),
anyLong(), anyLong(), anyLong());
RaftGroupService service0 = nodes.get(0).raftGroupService;
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 7565e541810..abe8d105f7e 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -3474,7 +3474,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
- verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong(), anyLong());
PeerId newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT +
1).getPeerId();
@@ -3520,7 +3520,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
learners.add(learner);
}
- verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong(), anyLong());
// Wait until every node sees every other node, otherwise
// changePeersAndLearnersAsync can fail.
@@ -3544,7 +3544,8 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
return false;
}, 10_000));
- verify(raftGrpEvtsLsnr,
times(1)).onNewPeersConfigurationApplied(eq(List.of(newPeer)),
eq(List.of(newLearner)), anyLong(), anyLong());
+ verify(raftGrpEvtsLsnr, times(1))
+ .onNewPeersConfigurationApplied(eq(List.of(newPeer)),
eq(List.of(newLearner)), anyLong(), anyLong(), anyLong());
}
}
@@ -3590,7 +3591,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
TestPeer newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
assertTrue(cluster.start(newPeer, false, 300));
- verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong(), anyLong());
// Wait until new node sees every other node, otherwise
// changePeersAndLearnersAsync can fail.
@@ -3613,7 +3614,8 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
verify(
raftGrpEvtsLsnr,
-
times(1)).onNewPeersConfigurationApplied(List.of(peer0.getPeerId(),
newPeer.getPeerId()), List.of(), term.get(), index.get()
+ times(1))
+ .onNewPeersConfigurationApplied(List.of(peer0.getPeerId(),
newPeer.getPeerId()), List.of(), term.get(), index.get(), 0
);
}
@@ -3745,7 +3747,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
Node leader = cluster.waitAndGetLeader();
- verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong(), anyLong());
assertEquals(1, leader.getCurrentTerm());
@@ -3766,7 +3768,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}, 10_000));
- verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong());
+ verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any(), anyLong(), anyLong(), anyLong());
}
@Test
@@ -3826,7 +3828,53 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
@Test
public void changePeersAndLearnersAsyncResponses() throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
- cluster = new TestCluster("testChangePeers", dataPath,
Collections.singletonList(peer0), testInfo);
+
+ AtomicLong appliedToken = new AtomicLong();
+
+ var raftGrpEvtsLsnr = new JraftGroupEventsListener() {
+ @Override
+ public void onLeaderElected(
+ long term,
+ long configurationTerm,
+ long configurationIndex,
+ Collection<PeerId> peers,
+ Collection<PeerId> learners,
+ long sequenceToken
+ ) {
+ }
+
+ @Override
+ public void onNewPeersConfigurationApplied(
+ Collection<PeerId> peers,
+ Collection<PeerId> learners,
+ long term,
+ long index,
+ long sequenceToken
+ ) {
+ appliedToken.set(sequenceToken);
+ }
+
+ @Override
+ public void onReconfigurationError(
+ Status status,
+ Collection<PeerId> peers,
+ Collection<PeerId> learners,
+ long term,
+ long sequenceToken
+ ) {
+ }
+ };
+ cluster = new TestCluster(
+ "testChangePeers",
+ dataPath,
+ Collections.singletonList(peer0),
+ new LinkedHashSet<>(),
+ ELECTION_TIMEOUT_MILLIS,
+ (peerId, opts) -> {
+ opts.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
+ },
+ testInfo
+ );
assertTrue(cluster.start(peer0));
Node leader = cluster.waitAndGetLeader();
@@ -3854,7 +3902,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
leader.getCurrentTerm(), done);
assertEquals(done.await(), Status.OK());
- sendTestTaskAndWait(leader, 10);
+ assertTrue(waitForCondition(() -> appliedToken.get() == 6, 10_000));
// change peer to new conf containing only new node
done = new SynchronizedClosure();
@@ -3869,7 +3917,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}, 10_000));
for (MockStateMachine fsm : cluster.getFsms()) {
- assertEquals(20, fsm.getLogs().size());
+ assertEquals(10, fsm.getLogs().size());
}
// check concurrent start of two async change peers.
@@ -3906,7 +3954,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}, 10_000));
for (MockStateMachine fsm : cluster.getFsms()) {
- assertEquals(30, fsm.getLogs().size());
+ assertEquals(20, fsm.getLogs().size());
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
index e138aae0c60..fc5bd4e6c08 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
@@ -51,8 +51,9 @@ public interface JraftGroupEventsListener {
* @param learners Collection of learners, which was applied by raft group
membership configuration.
* @param term Term on which the new configuration was applied.
* @param index Index on which the new configuration was applied.
+ * @param sequenceToken Sequence token of this change.
*/
- void onNewPeersConfigurationApplied(Collection<PeerId> peers,
Collection<PeerId> learners, long term, long index);
+ void onNewPeersConfigurationApplied(Collection<PeerId> peers,
Collection<PeerId> learners, long term, long index, long sequenceToken);
/**
* Invoked on the leader if membership reconfiguration failed, because of
{@link Status}.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
index 4011461578b..b9718755352 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
@@ -82,8 +82,14 @@ class RaftGroupEventsListenerAdapter implements
JraftGroupEventsListener {
}
@Override
- public void onNewPeersConfigurationApplied(Collection<PeerId> peerIds,
Collection<PeerId> learnerIds, long term, long index) {
- delegate.onNewPeersConfigurationApplied(configuration(peerIds,
learnerIds), term, index);
+ public void onNewPeersConfigurationApplied(
+ Collection<PeerId> peerIds,
+ Collection<PeerId> learnerIds,
+ long term,
+ long index,
+ long sequenceToken
+ ) {
+ delegate.onNewPeersConfigurationApplied(configuration(peerIds,
learnerIds), term, index, sequenceToken);
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 74dea0e1958..9be02c8b265 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -503,7 +503,7 @@ public class NodeImpl implements Node, RaftServerService {
if (status.isOk()) {
LogId id = node.conf.getId();
-
listener.onNewPeersConfigurationApplied(resultPeerIds, resultLearnerIds,
id.getTerm(), id.getIndex());
+
listener.onNewPeersConfigurationApplied(resultPeerIds, resultLearnerIds,
id.getTerm(), id.getIndex(), resultToken);
} else {
listener.onReconfigurationError(status,
resultPeerIds, resultLearnerIds, node.getCurrentTerm(), resultToken);
}
@@ -2921,7 +2921,11 @@ public class NodeImpl implements Node, RaftServerService
{
if (this.confCtx.isBusy()) {
LOG.warn("Node {} refused configuration concurrent changing.",
getNodeId());
if (done != null) {
-
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, new
Status(RaftError.EBUSY, "Doing another configuration change."));
+ Utils.runClosureInThread(
+ this.getOptions().getCommonExecutor(),
+ done,
+ new Status(RaftError.EBUSY, "Doing another
configuration change.")
+ );
}
return;
}
@@ -2937,7 +2941,13 @@ public class NodeImpl implements Node, RaftServerService
{
if (listener != null) {
LogId id = this.conf.getId();
-
listener.onNewPeersConfigurationApplied(newConf.getPeers(),
newConf.getLearners(), id.getTerm(), id.getIndex());
+ listener.onNewPeersConfigurationApplied(
+ newConf.getPeers(),
+ newConf.getLearners(),
+ id.getTerm(),
+ id.getIndex(),
+ newConf.getSequenceToken()
+ );
}
done.run(status);