This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d13553ebda IGNITE-19688 use waitAndGetLeader instead of waitLeader and
getLeader (#2259)
d13553ebda is described below
commit d13553ebdaa80ab440be487ff882e279bed37c19
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Jul 10 16:30:15 2023 +0400
IGNITE-19688 use waitAndGetLeader instead of waitLeader and getLeader
(#2259)
---
.../ignite/raft/jraft/core/ItCliServiceTest.java | 20 +-
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 347 +++++++--------------
.../apache/ignite/raft/jraft/core/TestCluster.java | 44 ++-
3 files changed, 146 insertions(+), 265 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
index 82589b73ae..82b8c0cae6 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
@@ -115,8 +115,7 @@ public class ItCliServiceTest {
cluster.startLearner(peer);
}
- cluster.waitLeader();
- cluster.ensureLeader(cluster.getLeader());
+ cluster.ensureLeader(cluster.waitAndGetLeader());
cliService = new CliServiceImpl();
conf = new Configuration(
@@ -178,8 +177,7 @@ public class ItCliServiceTest {
}
assertNotNull(targetPeer);
assertTrue(cliService.transferLeader(groupId, conf,
targetPeer).isOk());
- cluster.waitLeader();
- assertEquals(targetPeer, cluster.getLeader().getNodeId().getPeerId());
+ assertEquals(targetPeer,
cluster.waitAndGetLeader().getNodeId().getPeerId());
}
@SuppressWarnings("SameParameterValue")
@@ -340,15 +338,13 @@ public class ItCliServiceTest {
for (TestPeer peer : newPeers) {
assertTrue(cluster.start(peer));
}
- cluster.waitLeader();
- Node oldLeaderNode = cluster.getLeader();
+ Node oldLeaderNode = cluster.waitAndGetLeader();
assertNotNull(oldLeaderNode);
PeerId oldLeader = oldLeaderNode.getNodeId().getPeerId();
assertNotNull(oldLeader);
Status status = cliService.changePeers(groupId, conf, new
Configuration(newPeers.stream().map(TestPeer::getPeerId).collect(toList())));
assertTrue(status.isOk(), status.getErrorMsg());
- cluster.waitLeader();
- PeerId newLeader = cluster.getLeader().getNodeId().getPeerId();
+ PeerId newLeader = cluster.waitAndGetLeader().getNodeId().getPeerId();
assertNotEquals(oldLeader, newLeader);
assertTrue(newPeers.stream().anyMatch(p ->
p.getPeerId().equals(newLeader)));
}
@@ -385,9 +381,7 @@ public class ItCliServiceTest {
List<PeerId> peers = conf.getPeers();
cluster.stop(peers.get(0));
- cluster.waitLeader();
-
- leader = cluster.getLeader().getNodeId().getPeerId();
+ leader = cluster.waitAndGetLeader().getNodeId().getPeerId();
assertNotNull(leader);
assertArrayEquals(conf.getPeerSet().toArray(),
new HashSet<>(cliService.getPeers(groupId, conf)).toArray());
@@ -414,11 +408,9 @@ public class ItCliServiceTest {
cluster.stop(peers.get(0));
peers.remove(0);
- cluster.waitLeader();
-
sleep(1000);
- leader = cluster.getLeader().getNodeId().getPeerId();
+ leader = cluster.waitAndGetLeader().getNodeId().getPeerId();
assertNotNull(leader);
assertArrayEquals(new HashSet<>(peers).toArray(),
new HashSet<>(cliService.getAlivePeers(groupId, conf)).toArray());
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index b5d5787265..66e6bd1a6e 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -476,8 +476,7 @@ public class ItNodeTest {
assertTrue(cluster.start(peer));
// elect leader
- cluster.waitLeader();
- cluster.ensureLeader(cluster.getLeader());
+ cluster.ensureLeader(cluster.waitAndGetLeader());
for (Node follower : cluster.getFollowers())
waitForCondition(() -> follower.getLeaderId() != null, 5_000);
@@ -504,11 +503,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ // elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -579,8 +575,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
sendTestTaskAndWait(leader);
@@ -593,7 +588,7 @@ public class ItNodeTest {
LOG.info("Transfer leadership from {} to {}", leader, targetPeer);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
Thread.sleep(1000);
- cluster.waitLeader();
+ cluster.waitAndGetLeader();
assertTrue(waitForCondition(() -> startedCounter.get() == 4, 5_000),
startedCounter.get() + "");
@@ -612,11 +607,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ // elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -740,9 +732,7 @@ public class ItNodeTest {
}
// elect leader
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
waitForCondition(() -> leader.listAlivePeers().size() == 3, 5_000);
@@ -809,9 +799,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
assertEquals(3, leader.listPeers().size());
@@ -843,11 +831,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -948,11 +933,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //elect get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -975,11 +957,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -1001,11 +980,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -1027,11 +1003,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //wait and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -1076,8 +1049,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
assertNotNull(leader);
@@ -1094,8 +1066,7 @@ public class ItNodeTest {
assertTrue(cluster.stop(leader.getNodeId().getPeerId()));
// elect new leader
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
assertNotNull(leader);
@@ -1117,11 +1088,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ // elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -1146,8 +1114,7 @@ public class ItNodeTest {
cluster.clean(oldLeader);
// elect new leader
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
LOG.info("New leader is {}", leader);
assertNotNull(leader);
assertNotEquals(leader.getNodeId().getPeerId(), oldLeader);
@@ -1166,8 +1133,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true, null,
raftOptions));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
sendTestTaskAndWait(leader);
@@ -1192,8 +1158,7 @@ public class ItNodeTest {
assertTrue(cluster.start(peer, false, 300, true, null,
raftOptions));
}
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
sendTestTaskAndWait(leader);
@@ -1213,8 +1178,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true, null,
raftOptions));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
sendTestTaskAndWait(leader);
@@ -1234,8 +1198,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true, null,
raftOptions));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
sendTestTaskAndWait(leader);
@@ -1256,11 +1219,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
@@ -1302,11 +1262,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
@@ -1354,11 +1311,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ // elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
@@ -1392,11 +1346,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //wait and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
@@ -1468,11 +1419,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 300, true));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
@@ -1505,11 +1453,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //elect get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
LOG.info("Current leader is {}", leader.getLeaderId());
// apply tasks to leader
@@ -1538,8 +1483,7 @@ public class ItNodeTest {
stopBlockingMessagesOnFollowers(followers);
// elect new leader
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
LOG.info("Elect new leader is {}", leader.getLeaderId());
// apply tasks to new leader
CountDownLatch latch = new CountDownLatch(10);
@@ -1590,9 +1534,7 @@ public class ItNodeTest {
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
assertTrue(cluster.start(peer0));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(leader.getNodeId().getPeerId(), peer0.getPeerId());
sendTestTaskAndWait(leader);
@@ -1653,11 +1595,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ // wait and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -1714,11 +1653,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- // elect leader
- cluster.waitLeader();
-
- // get leader
- Node leader = cluster.getLeader();
+ //elect and get leader
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -1741,8 +1677,7 @@ public class ItNodeTest {
waitLatch(latch);
// elect new leader
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
LOG.info("New leader is {}", leader);
assertNotNull(leader);
// apply tasks to new leader
@@ -1780,9 +1715,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
// get leader
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
long savedTerm = ((NodeImpl) leader).getCurrentTerm();
assertNotNull(leader);
// apply tasks to leader
@@ -1832,8 +1766,7 @@ public class ItNodeTest {
peers.add(bootPeer.getPeerId());
// reset peers from empty
assertTrue(nodes.get(0).resetPeers(new Configuration(peers)).isOk());
- cluster.waitLeader();
- assertNotNull(cluster.getLeader());
+ assertNotNull(cluster.waitAndGetLeader());
}
@Test
@@ -1846,9 +1779,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
// get leader
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
// apply tasks to leader
sendTestTaskAndWait(leader);
@@ -1888,8 +1820,7 @@ public class ItNodeTest {
LOG.warn("Set peers to {}", leaderId);
assertTrue(leader.resetPeers(new Configuration(newPeers)).isOk());
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertEquals(leaderId, leader.getNodeId().getPeerId());
@@ -1932,9 +1863,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
// get leader
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
LOG.info("Leader: " + leader);
@@ -1950,7 +1880,7 @@ public class ItNodeTest {
assertTrue(cluster.stop(leaderAddr));
// restart leader
- cluster.waitLeader();
+ cluster.waitAndGetLeader();
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
assertTrue(cluster.start(findById(peers, leaderAddr)));
cluster.ensureSame();
@@ -1969,9 +1899,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
// get leader
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
LOG.info("Leader: " + leader);
@@ -1987,9 +1916,7 @@ public class ItNodeTest {
assertTrue(cluster.stop(leaderAddr));
// restart leader
- cluster.waitLeader();
-
- sendTestTaskAndWait(cluster.getLeader(), 10, RaftError.SUCCESS);
+ sendTestTaskAndWait(cluster.waitAndGetLeader(), 10, RaftError.SUCCESS);
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
assertTrue(cluster.start(findById(peers, leaderAddr)));
@@ -2012,9 +1939,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 200, false, new
ThroughputSnapshotThrottle(1024, 1)));
- cluster.waitLeader();
// get leader
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -2030,7 +1956,7 @@ public class ItNodeTest {
PeerId followerAddr = followers.get(0).getNodeId().getPeerId();
assertTrue(cluster.stop(followerAddr));
- cluster.waitLeader();
+ cluster.waitAndGetLeader();
// apply something more
sendTestTaskAndWait(leader, 10, RaftError.SUCCESS);
@@ -2069,9 +1995,8 @@ public class ItNodeTest {
boolean started = cluster.start(peer, false, 200, false);
assertTrue(started);
}
- cluster.waitLeader();
// get leader
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -2127,9 +2052,8 @@ public class ItNodeTest {
boolean started = cluster.start(peer, false, 200, false);
assertTrue(started);
}
- cluster.waitLeader();
// get leader
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -2186,9 +2110,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
// get leader
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -2287,15 +2210,13 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
// get leader
- Node leader0 = cluster.getLeader();
+ Node leader0 = cluster.waitAndGetLeader();
assertNotNull(leader0);
long savedTerm = ((NodeImpl) leader0).getCurrentTerm();
LOG.info("Current leader is {}, term is {}", leader0, savedTerm);
Thread.sleep(5000);
- cluster.waitLeader();
- Node leader1 = cluster.getLeader();
+ Node leader1 = cluster.waitAndGetLeader();
assertNotNull(leader1);
LOG.info("Current leader is {}", leader1);
assertEquals(savedTerm, ((NodeImpl) leader1).getCurrentTerm());
@@ -2310,9 +2231,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -2348,9 +2267,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
sendTestTaskAndWait(leader);
@@ -2362,8 +2279,7 @@ public class ItNodeTest {
PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy();
LOG.info("Transfer leadership from {} to {}", leader, targetPeer);
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
assertEquals(leader.getNodeId().getPeerId(), targetPeer);
}
@@ -2376,9 +2292,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 1));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -2397,7 +2311,7 @@ public class ItNodeTest {
leader.apply(task);
waitLatch(latch);
- cluster.waitLeader();
+ cluster.waitAndGetLeader();
assertTrue(cluster.start(findById(peers, targetPeer)));
@@ -2416,9 +2330,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer, false, 1));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -2439,8 +2351,7 @@ public class ItNodeTest {
waitLatch(latch);
Thread.sleep(100);
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
assertSame(leader, savedLeader);
// restart target peer
@@ -2529,9 +2440,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
Node oldLeader = leader;
@@ -2539,8 +2448,7 @@ public class ItNodeTest {
leader.shutdown();
leader.join();
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertNotSame(leader, oldLeader);
@@ -2555,7 +2463,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
+ cluster.waitAndGetLeader();
// Ensure the quorum before removing a leader, otherwise removePeer
can be rejected.
for (Node follower : cluster.getFollowers())
@@ -2569,8 +2477,7 @@ public class ItNodeTest {
oldLeader.removePeer(oldLeader.getNodeId().getPeerId(), new
ExpectClosure(latch));
waitLatch(latch);
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
assertNotNull(leader);
assertNotSame(leader, oldLeader);
}
@@ -2584,9 +2491,7 @@ public class ItNodeTest {
for (int i = 0; i < peers.size() - 1; i++)
assertTrue(cluster.start(peers.get(i)));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
sendTestTaskAndWait(leader);
@@ -2596,8 +2501,7 @@ public class ItNodeTest {
PeerId follower = followers.get(0).getNodeId().getPeerId();
assertTrue(leader.transferLeadershipTo(follower).isOk());
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
assertEquals(follower, leader.getNodeId().getPeerId());
CountDownLatch latch = new CountDownLatch(1);
@@ -2630,8 +2534,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
- Node oldLeader = cluster.getLeader();
+ Node oldLeader = cluster.waitAndGetLeader();
assertNotNull(oldLeader);
// apply something
sendTestTaskAndWait(oldLeader);
@@ -2648,8 +2551,7 @@ public class ItNodeTest {
// increase term by stopping leader and electing a new leader again
PeerId oldLeaderAddr = oldLeader.getNodeId().getPeerId().copy();
assertTrue(cluster.stop(oldLeaderAddr));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
LOG.info("Elect a new leader {}", leader);
// apply something again
@@ -2679,8 +2581,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
- Node firstLeader = cluster.getLeader();
+ Node firstLeader = cluster.waitAndGetLeader();
assertNotNull(firstLeader);
cluster.ensureLeader(firstLeader);
@@ -2699,8 +2600,7 @@ public class ItNodeTest {
// stop leader and elect new one
PeerId fstLeaderAddr = firstLeader.getNodeId().getPeerId();
assertTrue(cluster.stop(fstLeaderAddr));
- cluster.waitLeader();
- Node secondLeader = cluster.getLeader();
+ Node secondLeader = cluster.waitAndGetLeader();
assertNotNull(secondLeader);
sendTestTaskAndWait(secondLeader, 10, RaftError.SUCCESS);
@@ -2717,8 +2617,7 @@ public class ItNodeTest {
PeerId targetPeer =
secondFollowers.get(0).getNodeId().getPeerId().copy();
assertTrue(secondLeader.transferLeadershipTo(targetPeer).isOk());
Thread.sleep(100);
- cluster.waitLeader();
- Node thirdLeader = cluster.getLeader();
+ Node thirdLeader = cluster.waitAndGetLeader();
assertEquals(targetPeer, thirdLeader.getNodeId().getPeerId());
sendTestTaskAndWait(thirdLeader, 20, RaftError.SUCCESS);
@@ -2785,8 +2684,7 @@ public class ItNodeTest {
});
}
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
RpcClientEx client = sender(leader);
@@ -2811,9 +2709,8 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -3024,8 +2921,7 @@ public class ItNodeTest {
cluster = new TestCluster("testChangePeers", dataPath,
Collections.singletonList(peer0), testInfo);
assertTrue(cluster.start(peer0));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
List<TestPeer> peers = new ArrayList<>();
@@ -3037,8 +2933,7 @@ public class ItNodeTest {
assertTrue(cluster.start(peer, false, 300));
}
for (int i = 0; i < 9; i++) {
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
assertNotNull(leader);
PeerId leaderPeer = peers.get(i).getPeerId();
assertEquals(leaderPeer, leader.getNodeId().getPeerId());
@@ -3063,7 +2958,7 @@ public class ItNodeTest {
}
}
- cluster.waitLeader();
+ cluster.waitAndGetLeader();
for (MockStateMachine fsm : cluster.getFsms()) {
assertEquals(10, fsm.getLogs().size());
@@ -3080,9 +2975,7 @@ public class ItNodeTest {
cluster.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
assertTrue(cluster.start(peer0));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any());
@@ -3109,9 +3002,7 @@ public class ItNodeTest {
cluster.setRaftGrpEvtsLsnr(raftGrpEvtsLsnr);
assertTrue(cluster.start(peer0));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
List<TestPeer> peers = new ArrayList<>();
@@ -3170,19 +3061,19 @@ public class ItNodeTest {
assertTrue(cluster.start(p, false, 300));
}
- cluster.waitLeader();
+ cluster.waitAndGetLeader();
verify(raftGrpEvtsLsnr, times(1)).onLeaderElected(anyLong());
cluster.stop(cluster.getLeader().getLeaderId());
- cluster.waitLeader();
+ cluster.waitAndGetLeader();
verify(raftGrpEvtsLsnr, times(2)).onLeaderElected(anyLong());
cluster.stop(cluster.getLeader().getLeaderId());
- cluster.waitLeader();
+ cluster.waitAndGetLeader();
verify(raftGrpEvtsLsnr, times(3)).onLeaderElected(anyLong());
}
@@ -3193,15 +3084,13 @@ public class ItNodeTest {
cluster = new TestCluster("testChangePeers", dataPath,
Collections.singletonList(peer0), testInfo);
assertTrue(cluster.start(peer0));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
TestPeer peer = new TestPeer(testInfo, TestUtils.INIT_PORT + 1);
assertTrue(cluster.start(peer, false, 300));
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
assertNotNull(leader);
PeerId leaderPeer = peer0.getPeerId();
assertEquals(leaderPeer, leader.getNodeId().getPeerId());
@@ -3283,8 +3172,7 @@ public class ItNodeTest {
cluster = new TestCluster("testChangePeers", dataPath,
Collections.singletonList(peer0), testInfo);
assertTrue(cluster.start(peer0));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
sendTestTaskAndWait(leader);
Configuration conf = new Configuration();
@@ -3324,7 +3212,6 @@ public class ItNodeTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-19688")
public void testChangePeersStepsDownInJointConsensus() throws Exception {
List<TestPeer> peers = new ArrayList<>();
@@ -3338,8 +3225,7 @@ public class ItNodeTest {
cluster = new TestCluster("testChangePeersStepsDownInJointConsensus",
dataPath, peers, testInfo);
assertTrue(cluster.start(peer0));
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
assertNotNull(leader);
sendTestTaskAndWait(leader);
@@ -3381,8 +3267,7 @@ public class ItNodeTest {
assertTrue(cluster.start(peer3));
Thread.sleep(1000);
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader(Set.of(peer2.getPeerId(),
peer3.getPeerId()));
List<PeerId> thePeers = leader.listPeers();
assertTrue(!thePeers.isEmpty());
assertEquals(conf.getPeerSet(), new HashSet<>(thePeers));
@@ -3418,8 +3303,7 @@ public class ItNodeTest {
return Utils.runInThread(executor, () -> {
try {
while (!arg.stop) {
- arg.c.waitLeader();
- Node leader = arg.c.getLeader();
+ Node leader = arg.c.waitAndGetLeader();
if (leader == null)
continue;
// select peers in random
@@ -3465,8 +3349,7 @@ public class ItNodeTest {
Future<?> future = startChangePeersThread(arg);
for (int i = 0; i < 5000; ) {
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
if (leader == null)
continue;
SynchronizedClosure done = new SynchronizedClosure();
@@ -3482,9 +3365,8 @@ public class ItNodeTest {
}
arg.stop = true;
future.get();
- cluster.waitLeader();
SynchronizedClosure done = new SynchronizedClosure();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
leader.changePeers(new
Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())), done);
Status st = done.await();
assertTrue(st.isOk(), st.getErrorMsg());
@@ -3513,8 +3395,7 @@ public class ItNodeTest {
Future<?> future = startChangePeersThread(arg);
final int tasks = 5000;
for (int i = 0; i < tasks; ) {
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
if (leader == null)
continue;
SynchronizedClosure done = new SynchronizedClosure();
@@ -3530,9 +3411,8 @@ public class ItNodeTest {
}
arg.stop = true;
future.get();
- cluster.waitLeader();
SynchronizedClosure done = new SynchronizedClosure();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
leader.changePeers(new
Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())), done);
assertTrue(done.await().isOk());
cluster.ensureSame();
@@ -3574,8 +3454,7 @@ public class ItNodeTest {
Utils.runInThread(executor, () -> {
try {
for (int i = 0; i < 5000; ) {
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
if (leader == null)
continue;
SynchronizedClosure done = new SynchronizedClosure();
@@ -3605,9 +3484,8 @@ public class ItNodeTest {
for (Future<?> future : futures)
future.get();
- cluster.waitLeader();
SynchronizedClosure done = new SynchronizedClosure();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
leader.changePeers(new
Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())), done);
assertTrue(done.await().isOk());
cluster.ensureSame();
@@ -3628,9 +3506,7 @@ public class ItNodeTest {
for (TestPeer peer : peers)
assertTrue(cluster.start(peer));
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
LOG.warn("Current leader {}, electTimeout={}",
leader.getNodeId().getPeerId(), leader.getOptions().getElectionTimeoutMs());
@@ -3659,8 +3535,7 @@ public class ItNodeTest {
stopBlockingMessagesOnFollowers(followers);
// elect new leader
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
LOG.info("Elect new leader is {}, curTerm={}", leader.getLeaderId(),
((NodeImpl) leader).getCurrentTerm());
}
@@ -3694,9 +3569,7 @@ public class ItNodeTest {
assertTrue(cluster.start(peer));
}
- cluster.waitLeader();
-
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
int initElectionTimeout = leader.getOptions().getElectionTimeoutMs();
@@ -3726,8 +3599,7 @@ public class ItNodeTest {
stopBlockingMessagesOnFollowers(followers);
// elect new leader
- cluster.waitLeader();
- leader = cluster.getLeader();
+ leader = cluster.waitAndGetLeader();
LOG.info("Elected new leader is {}, curTerm={}", leader.getLeaderId(),
((NodeImpl) leader).getCurrentTerm());
@@ -3751,9 +3623,7 @@ public class ItNodeTest {
assertTrue(cluster.start(peer, false, 300, false, null, opts));
}
- cluster.waitLeader();
-
- NodeImpl leader = (NodeImpl) cluster.getLeader();
+ NodeImpl leader = (NodeImpl) cluster.waitAndGetLeader();
assertNotNull(leader);
cluster.ensureLeader(leader);
@@ -3823,8 +3693,7 @@ public class ItNodeTest {
});
}
- cluster.waitLeader();
- Node leader = cluster.getLeader();
+ Node leader = cluster.waitAndGetLeader();
cluster.ensureLeader(leader);
RpcClientEx client = sender(leader);
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/TestCluster.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 449495473a..025509f48e 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.core;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -31,9 +32,11 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@@ -390,22 +393,39 @@ public class TestCluster {
}
/**
- * Wait until a leader is elected.
+ * Wait until a leader is elected and return it.
* @throws InterruptedException
+ * @return Leader.
*/
- public void waitLeader() throws InterruptedException {
- Node node;
+ public Node waitAndGetLeader() throws InterruptedException {
+ AtomicReference<Node> node = new AtomicReference<>();
- while (true) {
- node = getLeader();
+ assertTrue(waitForCondition(() -> {
+ node.set(getLeader());
- if (node != null) {
- break;
- }
- else {
- Thread.sleep(10);
- }
- }
+ return node.get() != null;
+ }, 10_000L));
+
+ return node.get();
+ }
+
+ /**
+ * Wait until a leader is elected and a leader is from the expected set of
nodes {@code expectedLeaderPeer} and return it.
+ *
+ * @param expectedLeaderPeer Set of nodes with the expected node;
+ * @return Leader.
+ * @throws InterruptedException If failed.
+ */
+ public Node waitAndGetLeader(Set<PeerId> expectedLeaderPeer) throws
InterruptedException {
+ AtomicReference<Node> node = new AtomicReference<>();
+
+ assertTrue(waitForCondition(() -> {
+ node.set(getLeader());
+
+ return node.get() != null &&
expectedLeaderPeer.contains(node.get().getNodeId().getPeerId());
+ }, 10_000L));
+
+ return node.get();
}
public List<Node> getFollowers() {