This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f31d322c3c IGNITE-27085 Fix changePeersAndLearnersAsyncResponses test 
(#7003)
2f31d322c3c is described below

commit 2f31d322c3ce81042e28e9ade96a6ba4b7459ce1
Author: Cyrill <[email protected]>
AuthorDate: Wed Nov 19 10:04:37 2025 +0300

    IGNITE-27085 Fix changePeersAndLearnersAsyncResponses test (#7003)
    
    Co-authored-by: Kirill Sizov <[email protected]>
---
 .../RebalanceRaftGroupEventsListener.java          |  2 +-
 .../ZoneRebalanceRaftGroupEventsListener.java      |  2 +-
 .../internal/raft/RaftGroupEventsListener.java     |  3 +-
 .../internal/raft/ItRaftGroupServiceTest.java      |  2 +-
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  | 70 ++++++++++++++++++----
 .../internal/raft/JraftGroupEventsListener.java    |  3 +-
 .../impl/RaftGroupEventsListenerAdapter.java       | 10 +++-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    | 16 ++++-
 8 files changed, 87 insertions(+), 21 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index 87674c2504c..0361ad5e92c 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -181,7 +181,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
 
     /** {@inheritDoc} */
     @Override
-    public void onNewPeersConfigurationApplied(PeersAndLearners configuration, 
long term, long index) {
+    public void onNewPeersConfigurationApplied(PeersAndLearners configuration, 
long term, long index, long sequenceToken) {
         if (!busyLock.enterBusy()) {
             return;
         }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index 961f29b44b3..a1651dbd83d 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -291,7 +291,7 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
 
     /** {@inheritDoc} */
     @Override
-    public void onNewPeersConfigurationApplied(PeersAndLearners configuration, 
long term, long index) {
+    public void onNewPeersConfigurationApplied(PeersAndLearners configuration, 
long term, long index, long sequenceToken) {
         if (!busyLock.enterBusy()) {
             return;
         }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
index 41615773785..c1bc33be35e 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
@@ -44,8 +44,9 @@ public interface RaftGroupEventsListener {
      * @param configuration New Raft group configuration.
      * @param term Term on which the new configuration was applied.
      * @param index Index on which the new configuration was applied.
+     * @param sequenceToken Sequence token of this change.
      */
-    default void onNewPeersConfigurationApplied(PeersAndLearners 
configuration, long term, long index) {}
+    default void onNewPeersConfigurationApplied(PeersAndLearners 
configuration, long term, long index, long sequenceToken) {}
 
     /**
      * Invoked on the leader if membership reconfiguration failed, because of 
{@link Status}.
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index d87f9dec1e6..f5d302a0c8c 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -193,7 +193,7 @@ public class ItRaftGroupServiceTest extends 
IgniteAbstractTest {
             configurationComplete.countDown();
 
             return null;
-        }).when(eventsListener).onNewPeersConfigurationApplied(any(), 
anyLong(), anyLong());
+        }).when(eventsListener).onNewPeersConfigurationApplied(any(), 
anyLong(), anyLong(), anyLong());
 
         RaftGroupService service0 = nodes.get(0).raftGroupService;
 
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 7565e541810..abe8d105f7e 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -3474,7 +3474,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         Node leader = cluster.waitAndGetLeader();
         sendTestTaskAndWait(leader);
 
-        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong(), anyLong());
 
         PeerId newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 
1).getPeerId();
 
@@ -3520,7 +3520,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
             learners.add(learner);
         }
 
-        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong(), anyLong());
 
         // Wait until every node sees every other node, otherwise
         // changePeersAndLearnersAsync can fail.
@@ -3544,7 +3544,8 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
                 return false;
             }, 10_000));
 
-            verify(raftGrpEvtsLsnr, 
times(1)).onNewPeersConfigurationApplied(eq(List.of(newPeer)), 
eq(List.of(newLearner)), anyLong(), anyLong());
+            verify(raftGrpEvtsLsnr, times(1))
+                    .onNewPeersConfigurationApplied(eq(List.of(newPeer)), 
eq(List.of(newLearner)), anyLong(), anyLong(), anyLong());
         }
     }
 
@@ -3590,7 +3591,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         TestPeer newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
         assertTrue(cluster.start(newPeer, false, 300));
 
-        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong(), anyLong());
 
         // Wait until new node sees every other node, otherwise
         // changePeersAndLearnersAsync can fail.
@@ -3613,7 +3614,8 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
 
         verify(
                 raftGrpEvtsLsnr,
-                
times(1)).onNewPeersConfigurationApplied(List.of(peer0.getPeerId(), 
newPeer.getPeerId()), List.of(), term.get(), index.get()
+                times(1))
+                .onNewPeersConfigurationApplied(List.of(peer0.getPeerId(), 
newPeer.getPeerId()), List.of(), term.get(), index.get(), 0
         );
     }
 
@@ -3745,7 +3747,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
 
         Node leader = cluster.waitAndGetLeader();
 
-        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong(), anyLong());
 
         assertEquals(1, leader.getCurrentTerm());
 
@@ -3766,7 +3768,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
 
         }, 10_000));
 
-        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong(), anyLong());
     }
 
     @Test
@@ -3826,7 +3828,53 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
     @Test
     public void changePeersAndLearnersAsyncResponses() throws Exception {
         TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
-        cluster = new TestCluster("testChangePeers", dataPath, 
Collections.singletonList(peer0), testInfo);
+
+        AtomicLong appliedToken = new AtomicLong();
+
+        var raftGrpEvtsLsnr = new JraftGroupEventsListener() {
+            @Override
+            public void onLeaderElected(
+                    long term,
+                    long configurationTerm,
+                    long configurationIndex,
+                    Collection<PeerId> peers,
+                    Collection<PeerId> learners,
+                    long sequenceToken
+            ) {
+            }
+
+            @Override
+            public void onNewPeersConfigurationApplied(
+                    Collection<PeerId> peers,
+                    Collection<PeerId> learners,
+                    long term,
+                    long index,
+                    long sequenceToken
+            ) {
+                appliedToken.set(sequenceToken);
+            }
+
+            @Override
+            public void onReconfigurationError(
+                    Status status,
+                    Collection<PeerId> peers,
+                    Collection<PeerId> learners,
+                    long term,
+                    long sequenceToken
+            ) {
+            }
+        };
+        cluster = new TestCluster(
+                "testChangePeers",
+                dataPath,
+                Collections.singletonList(peer0),
+                new LinkedHashSet<>(),
+                ELECTION_TIMEOUT_MILLIS,
+                (peerId, opts) -> {
+                    opts.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
+                },
+                testInfo
+        );
         assertTrue(cluster.start(peer0));
 
         Node leader = cluster.waitAndGetLeader();
@@ -3854,7 +3902,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
                 leader.getCurrentTerm(), done);
         assertEquals(done.await(), Status.OK());
 
-        sendTestTaskAndWait(leader, 10);
+        assertTrue(waitForCondition(() -> appliedToken.get() == 6, 10_000));
 
         // change peer to new conf containing only new node
         done = new SynchronizedClosure();
@@ -3869,7 +3917,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         }, 10_000));
 
         for (MockStateMachine fsm : cluster.getFsms()) {
-            assertEquals(20, fsm.getLogs().size());
+            assertEquals(10, fsm.getLogs().size());
         }
 
         // check concurrent start of two async change peers.
@@ -3906,7 +3954,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         }, 10_000));
 
         for (MockStateMachine fsm : cluster.getFsms()) {
-            assertEquals(30, fsm.getLogs().size());
+            assertEquals(20, fsm.getLogs().size());
         }
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
index e138aae0c60..fc5bd4e6c08 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
@@ -51,8 +51,9 @@ public interface JraftGroupEventsListener {
      * @param learners Collection of learners, which was applied by raft group 
membership configuration.
      * @param term Term on which the new configuration was applied.
      * @param index Index on which the new configuration was applied.
+     * @param sequenceToken Sequence token of this change.
      */
-    void onNewPeersConfigurationApplied(Collection<PeerId> peers, 
Collection<PeerId> learners, long term, long index);
+    void onNewPeersConfigurationApplied(Collection<PeerId> peers, 
Collection<PeerId> learners, long term, long index, long sequenceToken);
 
     /**
      * Invoked on the leader if membership reconfiguration failed, because of 
{@link Status}.
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
index 4011461578b..b9718755352 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
@@ -82,8 +82,14 @@ class RaftGroupEventsListenerAdapter implements 
JraftGroupEventsListener {
     }
 
     @Override
-    public void onNewPeersConfigurationApplied(Collection<PeerId> peerIds, 
Collection<PeerId> learnerIds, long term, long index) {
-        delegate.onNewPeersConfigurationApplied(configuration(peerIds, 
learnerIds), term, index);
+    public void onNewPeersConfigurationApplied(
+            Collection<PeerId> peerIds,
+            Collection<PeerId> learnerIds,
+            long term,
+            long index,
+            long sequenceToken
+    ) {
+        delegate.onNewPeersConfigurationApplied(configuration(peerIds, 
learnerIds), term, index, sequenceToken);
     }
 
     @Override
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 74dea0e1958..9be02c8b265 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -503,7 +503,7 @@ public class NodeImpl implements Node, RaftServerService {
                         if (status.isOk()) {
                             LogId id = node.conf.getId();
 
-                            
listener.onNewPeersConfigurationApplied(resultPeerIds, resultLearnerIds, 
id.getTerm(), id.getIndex());
+                            
listener.onNewPeersConfigurationApplied(resultPeerIds, resultLearnerIds, 
id.getTerm(), id.getIndex(), resultToken);
                         } else {
                             listener.onReconfigurationError(status, 
resultPeerIds, resultLearnerIds, node.getCurrentTerm(), resultToken);
                         }
@@ -2921,7 +2921,11 @@ public class NodeImpl implements Node, RaftServerService 
{
         if (this.confCtx.isBusy()) {
             LOG.warn("Node {} refused configuration concurrent changing.", 
getNodeId());
             if (done != null) {
-                
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, new 
Status(RaftError.EBUSY, "Doing another configuration change."));
+                Utils.runClosureInThread(
+                        this.getOptions().getCommonExecutor(),
+                        done,
+                        new Status(RaftError.EBUSY, "Doing another 
configuration change.")
+                );
             }
             return;
         }
@@ -2937,7 +2941,13 @@ public class NodeImpl implements Node, RaftServerService 
{
                 if (listener != null) {
                     LogId id = this.conf.getId();
 
-                    
listener.onNewPeersConfigurationApplied(newConf.getPeers(), 
newConf.getLearners(), id.getTerm(), id.getIndex());
+                    listener.onNewPeersConfigurationApplied(
+                            newConf.getPeers(),
+                            newConf.getLearners(),
+                            id.getTerm(),
+                            id.getIndex(),
+                            newConf.getSequenceToken()
+                    );
                 }
 
                 done.run(status);

Reply via email to