This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e792a6190 IGNITE-24106 Propagate raft term and index to 
onNewPeersConfigurationApplied (#4978)
0e792a6190 is described below

commit 0e792a6190baf09f2ae99a09b63b30a7ceed9f65
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Dec 30 18:27:44 2024 +0400

    IGNITE-24106 Propagate raft term and index to 
onNewPeersConfigurationApplied (#4978)
---
 .../RebalanceRaftGroupEventsListener.java          |   2 +-
 .../ZoneRebalanceRaftGroupEventsListener.java      |   2 +-
 .../internal/raft/RaftGroupEventsListener.java     |   4 +-
 .../internal/raft/ItRaftGroupServiceTest.java      |   3 +-
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  | 123 ++++++++++++++++++++-
 .../apache/ignite/raft/jraft/core/TestCluster.java |   2 +-
 .../internal/raft/JraftGroupEventsListener.java    |   4 +-
 .../impl/RaftGroupEventsListenerAdapter.java       |   4 +-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |   8 +-
 9 files changed, 137 insertions(+), 15 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index e68c0b89f3..991cd5e2a3 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -164,7 +164,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
 
     /** {@inheritDoc} */
     @Override
-    public void onNewPeersConfigurationApplied(PeersAndLearners configuration) 
{
+    public void onNewPeersConfigurationApplied(PeersAndLearners configuration, 
long term, long index) {
         if (!busyLock.enterBusy()) {
             return;
         }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index dc844e77c4..4e11970984 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -214,7 +214,7 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
 
     /** {@inheritDoc} */
     @Override
-    public void onNewPeersConfigurationApplied(PeersAndLearners configuration) 
{
+    public void onNewPeersConfigurationApplied(PeersAndLearners configuration, 
long term, long index) {
         if (!busyLock.enterBusy()) {
             return;
         }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
index fe9be4c453..6bc505cac3 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
@@ -32,8 +32,10 @@ public interface RaftGroupEventsListener {
      * Invoked on the leader, when new peers' configuration applied to raft 
group.
      *
      * @param configuration New Raft group configuration.
+     * @param term Term on which the new configuration was applied.
+     * @param index Index on which the new configuration was applied.
      */
-    default void onNewPeersConfigurationApplied(PeersAndLearners 
configuration) {}
+    default void onNewPeersConfigurationApplied(PeersAndLearners 
configuration, long term, long index) {}
 
     /**
      * Invoked on the leader if membership reconfiguration failed, because of 
{@link Status}.
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index e17571e88a..d9d2f9c9c5 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -30,6 +30,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
@@ -177,7 +178,7 @@ public class ItRaftGroupServiceTest extends 
IgniteAbstractTest {
             configurationComplete.countDown();
 
             return null;
-        }).when(eventsListener).onNewPeersConfigurationApplied(any());
+        }).when(eventsListener).onNewPeersConfigurationApplied(any(), 
anyLong(), anyLong());
 
         RaftGroupService service0 = nodes.get(0).raftGroupService;
 
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index e0e0cee115..1453413be8 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -45,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
@@ -82,6 +83,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.BiConsumer;
@@ -118,6 +120,7 @@ import 
org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
 import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
 import org.apache.ignite.raft.jraft.closure.TaskClosure;
 import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
 import org.apache.ignite.raft.jraft.core.FSMCallerImpl.ApplyTask;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
@@ -3353,7 +3356,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
     @Test
     public void testOnReconfigurationErrorListener() throws Exception {
         TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
-        cluster = new TestCluster("testChangePeers", dataPath, 
Collections.singletonList(peer0), testInfo);
+        cluster = new TestCluster("testOnReconfigurationErrorListener", 
dataPath, Collections.singletonList(peer0), testInfo);
 
         var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
 
@@ -3363,7 +3366,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         Node leader = cluster.waitAndGetLeader();
         sendTestTaskAndWait(leader);
 
-        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any());
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
 
         PeerId newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 
1).getPeerId();
 
@@ -3383,7 +3386,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
     @Test
     public void testNewPeersConfigurationAppliedListener() throws Exception {
         TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
-        cluster = new TestCluster("testChangePeers", dataPath, 
Collections.singletonList(peer0), testInfo);
+        cluster = new TestCluster("testNewPeersConfigurationAppliedListener", 
dataPath, Collections.singletonList(peer0), testInfo);
 
         var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
 
@@ -3409,7 +3412,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
             learners.add(learner);
         }
 
-        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any());
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
 
         // Wait until every node sees every other node, otherwise
         // changePeersAndLearnersAsync can fail.
@@ -3433,10 +3436,120 @@ public class ItNodeTest extends BaseIgniteAbstractTest 
{
                 return false;
             }, 10_000));
 
-            verify(raftGrpEvtsLsnr, 
times(1)).onNewPeersConfigurationApplied(List.of(newPeer), List.of(newLearner));
+            verify(raftGrpEvtsLsnr, 
times(1)).onNewPeersConfigurationApplied(eq(List.of(newPeer)), 
eq(List.of(newLearner)), anyLong(), anyLong());
         }
     }
 
+    @Test
+    public void 
testIndexAndTermArePropagatedToOnNewPeersConfigurationApplied() throws 
Exception {
+        TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
+        var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
+
+        AtomicLong term = new AtomicLong(-1);
+        AtomicLong index = new AtomicLong(-1);
+
+        cluster = new TestCluster(
+                
"testIndexAndTermArePropagatedToOnNewPeersConfigurationApplied",
+                dataPath,
+                Collections.singletonList(peer0),
+                new LinkedHashSet<>(),
+                ELECTION_TIMEOUT_MILLIS,
+                (peerId, opts) -> {
+                    opts.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
+                    opts.setFsm(new MockStateMachine(peerId) {
+                        @Override
+                        public void 
onRawConfigurationCommitted(ConfigurationEntry conf) {
+                            term.set(conf.getId().getTerm());
+                            index.set(conf.getId().getIndex());
+                            super.onRawConfigurationCommitted(conf);
+                        }
+                    });
+                },
+                testInfo
+        );
+
+        assertTrue(cluster.start(peer0));
+
+        Node leader = cluster.waitAndGetLeader();
+
+        assertEquals(1, term.get());
+        assertEquals(1, index.get());
+
+        TestPeer newPeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
+        assertTrue(cluster.start(newPeer, false, 300));
+
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
+
+        // Wait until new node node sees every other node, otherwise
+        // changePeersAndLearnersAsync can fail.
+        waitForTopologyOnEveryNode(1, cluster);
+
+        SynchronizedClosure done = new SynchronizedClosure();
+        leader.changePeersAndLearnersAsync(
+                new Configuration(List.of(peer0.getPeerId(), 
newPeer.getPeerId()), List.of()), leader.getCurrentTerm(),
+                done
+        );
+
+        assertEquals(done.await(), Status.OK());
+
+        assertTrue(waitForCondition(() -> 
cluster.getLeader().listAlivePeers().contains(newPeer.getPeerId()), 10_000));
+
+        // Leader hasn't been changed, term must stay the same
+        assertEquals(1, term.get());
+        // idx_2 == joint consensus, idx_3 is expected final cfg
+        assertEquals(3, index.get());
+
+        verify(
+                raftGrpEvtsLsnr,
+                
times(1)).onNewPeersConfigurationApplied(List.of(peer0.getPeerId(), 
newPeer.getPeerId()), List.of(), term.get(), index.get()
+        );
+    }
+
+    @Test
+    public void testOnNewPeersConfigurationAppliedIsNotCalledAfterResetPeers() 
throws Exception {
+        TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
+        var raftGrpEvtsLsnr = mock(JraftGroupEventsListener.class);
+
+        cluster = new TestCluster(
+                "testOnNewPeersConfigurationAppliedIsNotCalledAfterResetPeers",
+                dataPath,
+                Collections.singletonList(peer0),
+                new LinkedHashSet<>(),
+                ELECTION_TIMEOUT_MILLIS,
+                (peerId, opts) -> {
+                    opts.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
+                },
+                testInfo
+        );
+
+        assertTrue(cluster.start(peer0));
+
+        Node leader = cluster.waitAndGetLeader();
+
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
+
+        assertEquals(1, leader.getCurrentTerm());
+
+        TestPeer fakePeer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
+
+        leader.resetPeers(new Configuration(List.of(fakePeer.getPeerId())));
+
+        leader.resetPeers(new Configuration(List.of(peer0.getPeerId())));
+
+        // Term was changed twice because of two reset peers
+        assertTrue(waitForCondition(() -> leader.getCurrentTerm() == 3, 
10_000));
+
+        assertTrue(waitForCondition(() -> {
+            if (cluster.getLeader() != null) {
+                return 
peer0.getPeerId().equals(cluster.getLeader().getLeaderId());
+            }
+            return false;
+
+        }, 10_000));
+
+        verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(), 
any(), anyLong(), anyLong());
+    }
+
     @Test
     public void testChangePeersAndLearnersOnLeaderElected() throws Exception {
         List<TestPeer> peers = IntStream.range(0, 6)
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 8f42635714..1e88427040 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -299,7 +299,7 @@ public class TestCluster {
 
             Node node = server.start();
 
-            this.fsms.put(peer.getPeerId(), fsm);
+            this.fsms.put(peer.getPeerId(), (MockStateMachine) 
nodeOptions.getFsm());
             this.nodes.add((NodeImpl) node);
             return true;
         }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
index 7f5cf736f3..dd12e1583d 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/JraftGroupEventsListener.java
@@ -37,8 +37,10 @@ public interface JraftGroupEventsListener {
      *
      * @param peers Collection of peers, which was applied by raft group 
membership configuration.
      * @param learners Collection of learners, which was applied by raft group 
membership configuration.
+     * @param term Term on which the new configuration was applied.
+     * @param index Index on which the new configuration was applied.
      */
-    void onNewPeersConfigurationApplied(Collection<PeerId> peers, 
Collection<PeerId> learners);
+    void onNewPeersConfigurationApplied(Collection<PeerId> peers, 
Collection<PeerId> learners, long term, long index);
 
     /**
      * Invoked on the leader if membership reconfiguration failed, because of 
{@link Status}.
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
index 3861c9365a..b3d706045f 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftGroupEventsListenerAdapter.java
@@ -69,8 +69,8 @@ class RaftGroupEventsListenerAdapter implements 
JraftGroupEventsListener {
     }
 
     @Override
-    public void onNewPeersConfigurationApplied(Collection<PeerId> peerIds, 
Collection<PeerId> learnerIds) {
-        delegate.onNewPeersConfigurationApplied(configuration(peerIds, 
learnerIds));
+    public void onNewPeersConfigurationApplied(Collection<PeerId> peerIds, 
Collection<PeerId> learnerIds, long term, long index) {
+        delegate.onNewPeersConfigurationApplied(configuration(peerIds, 
learnerIds), term, index);
     }
 
     @Override
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 411336efe9..ba59260b71 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -493,7 +493,9 @@ public class NodeImpl implements Node, RaftServerService {
 
                     if (listener != null) {
                         if (status.isOk()) {
-                            
listener.onNewPeersConfigurationApplied(resultPeerIds, resultLearnerIds);
+                            LogId id = node.conf.getId();
+
+                            
listener.onNewPeersConfigurationApplied(resultPeerIds, resultLearnerIds, 
id.getTerm(), id.getIndex());
                         } else {
                             listener.onReconfigurationError(status, 
resultPeerIds, resultLearnerIds, node.getCurrentTerm());
                         }
@@ -2766,7 +2768,9 @@ public class NodeImpl implements Node, RaftServerService {
                 JraftGroupEventsListener listener = 
this.getOptions().getRaftGrpEvtsLsnr();
 
                 if (listener != null) {
-                    
listener.onNewPeersConfigurationApplied(newConf.getPeers(), 
newConf.getLearners());
+                    LogId id = this.conf.getId();
+
+                    
listener.onNewPeersConfigurationApplied(newConf.getPeers(), 
newConf.getLearners(), id.getTerm(), id.getIndex());
                 }
 
                 done.run(status);

Reply via email to