This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d3e4e36cfd IGNITE-22891 Fix 
ItMetaStorageRaftGroupTest#testRangeNextWorksCorrectlyAfterLeaderChange (#4215)
d3e4e36cfd is described below

commit d3e4e36cfd02d43c3eaf839367aea0dcd0279e12
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Aug 12 20:32:38 2024 +0300

    IGNITE-22891 Fix 
ItMetaStorageRaftGroupTest#testRangeNextWorksCorrectlyAfterLeaderChange (#4215)
---
 .../server/raft/ItMetaStorageRaftGroupTest.java    | 129 ++++++---------------
 1 file changed, 35 insertions(+), 94 deletions(-)

diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
index 375f708164..8c463a2e3b 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
@@ -27,6 +27,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -41,7 +42,6 @@ import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.TestJraftServerFactory;
+import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -75,14 +76,10 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.Status;
-import org.apache.ignite.raft.jraft.core.Replicator;
-import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -224,36 +221,42 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
      * @throws Exception If failed.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22891";)
     public void testRangeNextWorksCorrectlyAfterLeaderChange() throws 
Exception {
-        AtomicInteger replicatorStartedCounter = new AtomicInteger(0);
-
-        AtomicInteger replicatorStoppedCounter = new AtomicInteger(0);
-
         when(mockStorage.range(EXPECTED_RESULT_ENTRY1.key(), new 
byte[]{4})).thenAnswer(invocation -> {
             List<Entry> entries = List.of(EXPECTED_RESULT_ENTRY1, 
EXPECTED_RESULT_ENTRY2);
 
             return Cursor.fromBareIterator(entries.iterator());
         });
 
-        List<Pair<RaftServer, RaftGroupService>> raftServersRaftGroups = 
prepareJraftMetaStorages(replicatorStartedCounter,
-                replicatorStoppedCounter);
+        List<Pair<RaftServer, RaftGroupService>> raftServersRaftGroups = 
prepareJraftMetaStorages();
 
         List<RaftServer> raftServers = raftServersRaftGroups.stream().map(p -> 
p.key).collect(Collectors.toList());
 
-        String oldLeaderId = 
raftServersRaftGroups.get(0).value.leader().consistentId();
+        CompletableFuture<LeaderWithTerm> oldLeaderFut = 
raftServersRaftGroups.get(0).value.refreshAndGetLeaderWithTerm();
+
+        assertThat(oldLeaderFut, willCompleteSuccessfully());
+
+        LeaderWithTerm leaderWithTerm = oldLeaderFut.join();
+
+        assertNotNull(leaderWithTerm.leader());
+        String oldLeaderId = leaderWithTerm.leader().consistentId();
+        long oldLeaderTerm = leaderWithTerm.term();
 
         RaftServer oldLeaderServer = raftServers.stream()
                 .filter(s -> 
localMemberName(s.clusterService()).equals(oldLeaderId))
                 .findFirst()
                 .orElseThrow();
 
+        log.info("Test: old raft leader: " + 
oldLeaderServer.clusterService().nodeName());
+
         // Server that will be alive after we stop leader.
         RaftServer liveServer = raftServers.stream()
                 .filter(s -> 
!localMemberName(s.clusterService()).equals(oldLeaderId))
                 .findFirst()
                 .orElseThrow();
 
+        log.info("Test: liveServer: " + 
liveServer.clusterService().nodeName());
+
         RaftGroupService raftGroupServiceOfLiveServer = 
raftServersRaftGroups.stream()
                 .filter(p -> p.key.equals(liveServer))
                 .findFirst()
@@ -279,16 +282,7 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
                     public void onSubscribe(Subscription subscription) {
                         this.subscription = subscription;
 
-                        try {
-                            assertTrue(
-                                    waitForCondition(() -> 
replicatorStartedCounter.get() == 2, 5_000),
-                                    
String.valueOf(replicatorStartedCounter.get())
-                            );
-
-                            subscription.request(1);
-                        } catch (InterruptedException e) {
-                            resultFuture.completeExceptionally(e);
-                        }
+                        subscription.request(1);
                     }
 
                     @Override
@@ -297,14 +291,7 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
                             if (state == 0) {
                                 assertEquals(EXPECTED_RESULT_ENTRY1, item);
 
-                                // Ensure that leader has not been changed.
-                                // In a stable topology unexpected leader 
election shouldn't happen.
-                                assertTrue(
-                                        waitForCondition(() -> 
replicatorStartedCounter.get() == 2, 5_000),
-                                        
String.valueOf(replicatorStartedCounter.get())
-                                );
-
-                                // stop leader
+                                // Stop leader.
                                 
oldLeaderServer.stopRaftNodes(MetastorageGroupId.INSTANCE);
                                 ComponentContext componentContext = new 
ComponentContext();
 
@@ -316,22 +303,25 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
                                         .stopAsync(componentContext);
                                 assertThat(stopFuture, 
willCompleteSuccessfully());
 
-                                
raftGroupServiceOfLiveServer.refreshLeader().get();
+                                CompletableFuture<LeaderWithTerm> 
newLeaderWithTermFut = raftGroupServiceOfLiveServer
+                                        .refreshAndGetLeaderWithTerm();
+                                assertThat(newLeaderWithTermFut, 
willCompleteSuccessfully());
+                                LeaderWithTerm newLeaderWithTerm = 
newLeaderWithTermFut.join();
+
+                                assertNotNull(newLeaderWithTerm.leader());
+                                assertNotSame(oldLeaderId, 
newLeaderWithTerm.leader().consistentId());
+
+                                // Check that the leader changed only once.
+                                assertEquals(oldLeaderTerm + 1, 
newLeaderWithTerm.term());
 
-                                assertNotSame(oldLeaderId, 
raftGroupServiceOfLiveServer.leader().consistentId());
+                                log.info("Test: new leader: " + 
raftGroupServiceOfLiveServer.leader().consistentId());
 
-                                // ensure that leader has been changed only 
once
-                                assertTrue(
-                                        waitForCondition(() -> 
replicatorStartedCounter.get() == 4, 5_000),
-                                        
String.valueOf(replicatorStartedCounter.get())
-                                );
-                                assertTrue(
-                                        waitForCondition(() -> 
replicatorStoppedCounter.get() == 2, 5_000),
-                                        
String.valueOf(replicatorStoppedCounter.get())
-                                );
+                                log.info("Test: Entry 1 processed.");
 
                             } else if (state == 1) {
                                 assertEquals(EXPECTED_RESULT_ENTRY2, item);
+
+                                log.info("Test: Entry 2 processed.");
                             }
 
                             state++;
@@ -344,6 +334,8 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
 
                     @Override
                     public void onError(Throwable throwable) {
+                        log.error("Test: error.", throwable);
+
                         resultFuture.completeExceptionally(throwable);
                     }
 
@@ -356,8 +348,7 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
         assertThat(resultFuture, willCompleteSuccessfully());
     }
 
-    private List<Pair<RaftServer, RaftGroupService>> 
prepareJraftMetaStorages(AtomicInteger replicatorStartedCounter,
-            AtomicInteger replicatorStoppedCounter) throws 
InterruptedException {
+    private List<Pair<RaftServer, RaftGroupService>> 
prepareJraftMetaStorages() throws InterruptedException {
         PeersAndLearners membersConfiguration = cluster.stream()
                 .map(ItMetaStorageRaftGroupTest::localMemberName)
                 .collect(collectingAndThen(toSet(), 
PeersAndLearners::fromConsistentIds));
@@ -367,18 +358,12 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
         var commandsMarshaller = new 
ThreadLocalOptimizedMarshaller(cluster.get(0).serializationRegistry());
 
         NodeOptions opt1 = new NodeOptions();
-        opt1.setReplicationStateListeners(
-                List.of(new 
UserReplicatorStateListener(replicatorStartedCounter, 
replicatorStoppedCounter)));
         opt1.setCommandsMarshaller(commandsMarshaller);
 
         NodeOptions opt2 = new NodeOptions();
-        opt2.setReplicationStateListeners(
-                List.of(new 
UserReplicatorStateListener(replicatorStartedCounter, 
replicatorStoppedCounter)));
         opt2.setCommandsMarshaller(commandsMarshaller);
 
         NodeOptions opt3 = new NodeOptions();
-        opt3.setReplicationStateListeners(
-                List.of(new 
UserReplicatorStateListener(replicatorStartedCounter, 
replicatorStoppedCounter)));
         opt3.setCommandsMarshaller(commandsMarshaller);
 
         metaStorageRaftSrv1 = TestJraftServerFactory.create(
@@ -510,50 +495,6 @@ public class ItMetaStorageRaftGroupTest extends 
IgniteAbstractTest {
         return service.topologyService().localMember().name();
     }
 
-    /**
-     * User's replicator state listener.
-     */
-    static class UserReplicatorStateListener implements 
Replicator.ReplicatorStateListener {
-        /** Replicator started counter. */
-        private final AtomicInteger replicatorStartedCounter;
-
-        /** Replicator stopped counter. */
-        private final AtomicInteger replicatorStoppedCounter;
-
-        /**
-         * Constructor.
-         *
-         * @param replicatorStartedCounter Replicator started counter.
-         * @param replicatorStoppedCounter Replicator stopped counter.
-         */
-        UserReplicatorStateListener(AtomicInteger replicatorStartedCounter, 
AtomicInteger replicatorStoppedCounter) {
-            this.replicatorStartedCounter = replicatorStartedCounter;
-            this.replicatorStoppedCounter = replicatorStoppedCounter;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public void onCreated(PeerId peer) {
-            int val = replicatorStartedCounter.incrementAndGet();
-
-            LOG.info("Replicator has been created {} {}", peer, val);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public void onError(PeerId peer, Status status) {
-            LOG.info("Replicator has errors {} {}", peer, status);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public void onDestroyed(PeerId peer) {
-            int val = replicatorStoppedCounter.incrementAndGet();
-
-            LOG.info("Replicator has been destroyed {} {}", peer, val);
-        }
-    }
-
     /**
      * Internal pair implementation.
      *

Reply via email to