This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d3e4e36cfd IGNITE-22891 Fix
ItMetaStorageRaftGroupTest#testRangeNextWorksCorrectlyAfterLeaderChange (#4215)
d3e4e36cfd is described below
commit d3e4e36cfd02d43c3eaf839367aea0dcd0279e12
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Aug 12 20:32:38 2024 +0300
IGNITE-22891 Fix
ItMetaStorageRaftGroupTest#testRangeNextWorksCorrectlyAfterLeaderChange (#4215)
---
.../server/raft/ItMetaStorageRaftGroupTest.java | 129 ++++++---------------
1 file changed, 35 insertions(+), 94 deletions(-)
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
index 375f708164..8c463a2e3b 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java
@@ -27,6 +27,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -41,7 +42,6 @@ import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.TestJraftServerFactory;
+import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -75,14 +76,10 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.Status;
-import org.apache.ignite.raft.jraft.core.Replicator;
-import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -224,36 +221,42 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
* @throws Exception If failed.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-22891")
public void testRangeNextWorksCorrectlyAfterLeaderChange() throws
Exception {
- AtomicInteger replicatorStartedCounter = new AtomicInteger(0);
-
- AtomicInteger replicatorStoppedCounter = new AtomicInteger(0);
-
when(mockStorage.range(EXPECTED_RESULT_ENTRY1.key(), new
byte[]{4})).thenAnswer(invocation -> {
List<Entry> entries = List.of(EXPECTED_RESULT_ENTRY1,
EXPECTED_RESULT_ENTRY2);
return Cursor.fromBareIterator(entries.iterator());
});
- List<Pair<RaftServer, RaftGroupService>> raftServersRaftGroups =
prepareJraftMetaStorages(replicatorStartedCounter,
- replicatorStoppedCounter);
+ List<Pair<RaftServer, RaftGroupService>> raftServersRaftGroups =
prepareJraftMetaStorages();
List<RaftServer> raftServers = raftServersRaftGroups.stream().map(p ->
p.key).collect(Collectors.toList());
- String oldLeaderId =
raftServersRaftGroups.get(0).value.leader().consistentId();
+ CompletableFuture<LeaderWithTerm> oldLeaderFut =
raftServersRaftGroups.get(0).value.refreshAndGetLeaderWithTerm();
+
+ assertThat(oldLeaderFut, willCompleteSuccessfully());
+
+ LeaderWithTerm leaderWithTerm = oldLeaderFut.join();
+
+ assertNotNull(leaderWithTerm.leader());
+ String oldLeaderId = leaderWithTerm.leader().consistentId();
+ long oldLeaderTerm = leaderWithTerm.term();
RaftServer oldLeaderServer = raftServers.stream()
.filter(s ->
localMemberName(s.clusterService()).equals(oldLeaderId))
.findFirst()
.orElseThrow();
+ log.info("Test: old raft leader: " +
oldLeaderServer.clusterService().nodeName());
+
// Server that will be alive after we stop leader.
RaftServer liveServer = raftServers.stream()
.filter(s ->
!localMemberName(s.clusterService()).equals(oldLeaderId))
.findFirst()
.orElseThrow();
+ log.info("Test: liveServer: " +
liveServer.clusterService().nodeName());
+
RaftGroupService raftGroupServiceOfLiveServer =
raftServersRaftGroups.stream()
.filter(p -> p.key.equals(liveServer))
.findFirst()
@@ -279,16 +282,7 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
- try {
- assertTrue(
- waitForCondition(() ->
replicatorStartedCounter.get() == 2, 5_000),
-
String.valueOf(replicatorStartedCounter.get())
- );
-
- subscription.request(1);
- } catch (InterruptedException e) {
- resultFuture.completeExceptionally(e);
- }
+ subscription.request(1);
}
@Override
@@ -297,14 +291,7 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
if (state == 0) {
assertEquals(EXPECTED_RESULT_ENTRY1, item);
- // Ensure that leader has not been changed.
- // In a stable topology unexpected leader
election shouldn't happen.
- assertTrue(
- waitForCondition(() ->
replicatorStartedCounter.get() == 2, 5_000),
-
String.valueOf(replicatorStartedCounter.get())
- );
-
- // stop leader
+ // Stop leader.
oldLeaderServer.stopRaftNodes(MetastorageGroupId.INSTANCE);
ComponentContext componentContext = new
ComponentContext();
@@ -316,22 +303,25 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
.stopAsync(componentContext);
assertThat(stopFuture,
willCompleteSuccessfully());
-
raftGroupServiceOfLiveServer.refreshLeader().get();
+ CompletableFuture<LeaderWithTerm>
newLeaderWithTermFut = raftGroupServiceOfLiveServer
+ .refreshAndGetLeaderWithTerm();
+ assertThat(newLeaderWithTermFut,
willCompleteSuccessfully());
+ LeaderWithTerm newLeaderWithTerm =
newLeaderWithTermFut.join();
+
+ assertNotNull(newLeaderWithTerm.leader());
+ assertNotSame(oldLeaderId,
newLeaderWithTerm.leader().consistentId());
+
+ // Check that the leader changed only once.
+ assertEquals(oldLeaderTerm + 1,
newLeaderWithTerm.term());
- assertNotSame(oldLeaderId,
raftGroupServiceOfLiveServer.leader().consistentId());
+ log.info("Test: new leader: " +
raftGroupServiceOfLiveServer.leader().consistentId());
- // ensure that leader has been changed only
once
- assertTrue(
- waitForCondition(() ->
replicatorStartedCounter.get() == 4, 5_000),
-
String.valueOf(replicatorStartedCounter.get())
- );
- assertTrue(
- waitForCondition(() ->
replicatorStoppedCounter.get() == 2, 5_000),
-
String.valueOf(replicatorStoppedCounter.get())
- );
+ log.info("Test: Entry 1 processed.");
} else if (state == 1) {
assertEquals(EXPECTED_RESULT_ENTRY2, item);
+
+ log.info("Test: Entry 2 processed.");
}
state++;
@@ -344,6 +334,8 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
@Override
public void onError(Throwable throwable) {
+ log.error("Test: error.", throwable);
+
resultFuture.completeExceptionally(throwable);
}
@@ -356,8 +348,7 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
assertThat(resultFuture, willCompleteSuccessfully());
}
- private List<Pair<RaftServer, RaftGroupService>>
prepareJraftMetaStorages(AtomicInteger replicatorStartedCounter,
- AtomicInteger replicatorStoppedCounter) throws
InterruptedException {
+ private List<Pair<RaftServer, RaftGroupService>>
prepareJraftMetaStorages() throws InterruptedException {
PeersAndLearners membersConfiguration = cluster.stream()
.map(ItMetaStorageRaftGroupTest::localMemberName)
.collect(collectingAndThen(toSet(),
PeersAndLearners::fromConsistentIds));
@@ -367,18 +358,12 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
var commandsMarshaller = new
ThreadLocalOptimizedMarshaller(cluster.get(0).serializationRegistry());
NodeOptions opt1 = new NodeOptions();
- opt1.setReplicationStateListeners(
- List.of(new
UserReplicatorStateListener(replicatorStartedCounter,
replicatorStoppedCounter)));
opt1.setCommandsMarshaller(commandsMarshaller);
NodeOptions opt2 = new NodeOptions();
- opt2.setReplicationStateListeners(
- List.of(new
UserReplicatorStateListener(replicatorStartedCounter,
replicatorStoppedCounter)));
opt2.setCommandsMarshaller(commandsMarshaller);
NodeOptions opt3 = new NodeOptions();
- opt3.setReplicationStateListeners(
- List.of(new
UserReplicatorStateListener(replicatorStartedCounter,
replicatorStoppedCounter)));
opt3.setCommandsMarshaller(commandsMarshaller);
metaStorageRaftSrv1 = TestJraftServerFactory.create(
@@ -510,50 +495,6 @@ public class ItMetaStorageRaftGroupTest extends
IgniteAbstractTest {
return service.topologyService().localMember().name();
}
- /**
- * User's replicator state listener.
- */
- static class UserReplicatorStateListener implements
Replicator.ReplicatorStateListener {
- /** Replicator started counter. */
- private final AtomicInteger replicatorStartedCounter;
-
- /** Replicator stopped counter. */
- private final AtomicInteger replicatorStoppedCounter;
-
- /**
- * Constructor.
- *
- * @param replicatorStartedCounter Replicator started counter.
- * @param replicatorStoppedCounter Replicator stopped counter.
- */
- UserReplicatorStateListener(AtomicInteger replicatorStartedCounter,
AtomicInteger replicatorStoppedCounter) {
- this.replicatorStartedCounter = replicatorStartedCounter;
- this.replicatorStoppedCounter = replicatorStoppedCounter;
- }
-
- /** {@inheritDoc} */
- @Override
- public void onCreated(PeerId peer) {
- int val = replicatorStartedCounter.incrementAndGet();
-
- LOG.info("Replicator has been created {} {}", peer, val);
- }
-
- /** {@inheritDoc} */
- @Override
- public void onError(PeerId peer, Status status) {
- LOG.info("Replicator has errors {} {}", peer, status);
- }
-
- /** {@inheritDoc} */
- @Override
- public void onDestroyed(PeerId peer) {
- int val = replicatorStoppedCounter.incrementAndGet();
-
- LOG.info("Replicator has been destroyed {} {}", peer, val);
- }
- }
-
/**
* Internal pair implementation.
*