This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 26d625b5ce IGNITE-22801 Add term param to changePeers (#4129)
26d625b5ce is described below
commit 26d625b5ce28e73194d829a7543265943abcd9c8
Author: Alexander Lapin <[email protected]>
AuthorDate: Thu Aug 1 10:12:17 2024 +0300
IGNITE-22801 Add term param to changePeers (#4129)
---
.../cluster/management/raft/CmgRaftService.java | 6 +-
.../rebalance/PartitionMover.java | 14 ++--
.../RebalanceRaftGroupEventsListener.java | 6 +-
.../ZoneRebalanceRaftGroupEventsListener.java | 6 +-
.../tech-notes/images/primaryReplica.svg | 2 +-
.../tech-notes/src/primaryReplica.puml | 4 +-
.../impl/MetaStorageLeaderElectionListener.java | 2 +-
.../PartitionReplicaLifecycleManager.java | 2 +-
.../internal/raft/service/RaftGroupService.java | 12 +--
.../ignite/internal/raft/ItLearnersTest.java | 5 +-
.../internal/raft/ItRaftGroupServiceTest.java | 4 +-
.../ignite/raft/jraft/core/ItCliServiceTest.java | 11 ++-
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 92 ++++++++++++++--------
.../ignite/internal/raft/RaftGroupServiceImpl.java | 40 ++++++----
.../org/apache/ignite/raft/jraft/CliService.java | 12 ++-
.../java/org/apache/ignite/raft/jraft/Node.java | 9 ++-
.../apache/ignite/raft/jraft/RaftMessageGroup.java | 8 +-
.../ignite/raft/jraft/core/CliServiceImpl.java | 29 ++++---
.../apache/ignite/raft/jraft/core/NodeImpl.java | 31 +++++---
.../ignite/raft/jraft/rpc/CliClientService.java | 8 +-
.../apache/ignite/raft/jraft/rpc/CliRequests.java | 29 ++++---
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 8 +-
...angePeersAndLearnersAsyncRequestProcessor.java} | 22 +++---
...=> ChangePeersAndLearnersRequestProcessor.java} | 53 +++++++++----
.../jraft/rpc/impl/cli/CliClientServiceImpl.java | 8 +-
.../ignite/internal/raft/RaftGroupServiceTest.java | 50 ++++++++++--
...PeersAndLearnersAsyncRequestProcessorTest.java} | 22 +++---
...hangePeersAndLearnersRequestProcessorTest.java} | 36 ++++++---
.../{changePeers.md => changePeersAndLearners.md} | 10 +--
modules/raft/tech-notes/nodeCatchUp.md | 2 +-
.../raft/client/TopologyAwareRaftGroupService.java | 8 +-
.../rebalance/ItRebalanceDistributedTest.java | 4 +-
.../ignite/internal/rebalance/ItRebalanceTest.java | 4 +-
.../rebalance/ItRebalanceTriggersRecoveryTest.java | 6 +-
.../internal/table/distributed/TableManager.java | 2 +-
.../table/distributed/PartitionMoverTest.java | 10 +--
modules/table/tech-notes/rebalance.md | 24 +++---
37 files changed, 377 insertions(+), 224 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index 98f1a512f4..b89322943a 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -285,8 +285,8 @@ public class CmgRaftService implements ManuallyCloseable {
}
/**
- * Issues {@code changePeersAsync} request with same peers; learners are
recalculated based on the current peers (which is same as
- * CMG nodes) and known logical topology. Any node in the logical topology
that is not a CMG node constitutes a learner.
+ * Issues {@code changePeersAndLearnersAsync} request with same peers;
learners are recalculated based on the current peers (which is
+ * same as CMG nodes) and known logical topology. Any node in the logical
topology that is not a CMG node constitutes a learner.
*
* @param term RAFT term in which we operate (used to avoid races when
changing peers/learners).
* @return Future that completes when the request is processed.
@@ -317,7 +317,7 @@ public class CmgRaftService implements ManuallyCloseable {
if (newLearners.isEmpty()) {
// Methods for working with learners do not support empty peer
lists for some reason.
- return raftService.changePeersAsync(newConfiguration, term)
+ return raftService.changePeersAndLearnersAsync(newConfiguration,
term)
.thenRun(() ->
raftService.updateConfiguration(newConfiguration));
} else {
return raftService.resetLearners(newConfiguration.learners());
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
index 694e8ed7fe..1a069020aa 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
@@ -52,11 +52,11 @@ public class PartitionMover {
}
/**
- * Performs {@link RaftGroupService#changePeersAsync} on a provided raft
group service of a partition, so nodes of the corresponding
- * raft group can be reconfigured. Retry mechanism is applied to repeat
{@link RaftGroupService#changePeersAsync} if previous one failed
- * with some exception.
+ * Performs {@link RaftGroupService#changePeersAndLearnersAsync} on a
provided raft group service of a partition, so nodes of the
+ * corresponding raft group can be reconfigured. Retry mechanism is
applied to repeat
+ * {@link RaftGroupService#changePeersAndLearnersAsync} if previous one
failed with some exception.
*
- * @return Function which performs {@link
RaftGroupService#changePeersAsync}.
+ * @return Function which performs {@link
RaftGroupService#changePeersAndLearnersAsync}.
*/
public CompletableFuture<Void> movePartition(PeersAndLearners
peersAndLearners, long term) {
if (!busyLock.enterBusy()) {
@@ -66,7 +66,7 @@ public class PartitionMover {
try {
return raftGroupServiceSupplier
.get()
- .thenCompose(raftGroupService ->
raftGroupService.changePeersAsync(peersAndLearners, term))
+ .thenCompose(raftGroupService ->
raftGroupService.changePeersAndLearnersAsync(peersAndLearners, term))
.handle((resp, err) -> {
if (!busyLock.enterBusy()) {
throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
@@ -75,12 +75,12 @@ public class PartitionMover {
try {
if (err != null) {
if (recoverable(err)) {
- LOG.debug("Recoverable error received
during changePeersAsync invocation, retrying", err);
+ LOG.debug("Recoverable error received
during changePeersAndLearnersAsync invocation, retrying", err);
} else {
// TODO: IGNITE-19087 Ideally, rebalance,
which has initiated this invocation should be canceled,
// TODO: Also it might be reasonable to
delegate such exceptional case to a general failure handler.
// TODO: At the moment, we repeat such
intents as well.
- LOG.debug("Unrecoverable error received
during changePeersAsync invocation, retrying", err);
+ LOG.debug("Unrecoverable error received
during changePeersAndLearnersAsync invocation, retrying", err);
}
return movePartition(peersAndLearners, term);
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index fb6335597e..5e1b5d2e5d 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -345,14 +345,14 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
LOG.debug("Error occurred during rebalance [partId={}]",
tablePartitionId);
if (rebalanceAttempts.incrementAndGet() <
REBALANCE_RETRY_THRESHOLD) {
- scheduleChangePeers(configuration, term);
+ scheduleChangePeersAndLearners(configuration, term);
} else {
LOG.info("Number of retries for rebalance exceeded the
threshold [partId={}, threshold={}]", tablePartitionId,
REBALANCE_RETRY_THRESHOLD);
// TODO: currently we just retry intent to change peers
according to the rebalance infinitely, until new leader is elected,
// TODO: but rebalance cancel mechanism should be implemented.
https://issues.apache.org/jira/browse/IGNITE-19087
- scheduleChangePeers(configuration, term);
+ scheduleChangePeersAndLearners(configuration, term);
}
} finally {
busyLock.leaveBusy();
@@ -365,7 +365,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
* @param peersAndLearners Peers and learners.
* @param term Current known leader term.
*/
- private void scheduleChangePeers(PeersAndLearners peersAndLearners, long
term) {
+ private void scheduleChangePeersAndLearners(PeersAndLearners
peersAndLearners, long term) {
rebalanceScheduler.schedule(() -> {
if (!busyLock.enterBusy()) {
return;
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index 2ae5ddc0d0..2ecd2163a7 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -268,14 +268,14 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
LOG.debug("Error occurred during rebalance [partId={}]",
zonePartitionId);
if (rebalanceAttempts.incrementAndGet() <
REBALANCE_RETRY_THRESHOLD) {
- scheduleChangePeers(configuration, term);
+ scheduleChangePeersAndLearners(configuration, term);
} else {
LOG.info("Number of retries for rebalance exceeded the
threshold [partId={}, threshold={}]", zonePartitionId,
REBALANCE_RETRY_THRESHOLD);
// TODO: currently we just retry intent to change peers
according to the rebalance infinitely, until new leader is elected,
// TODO: but rebalance cancel mechanism should be implemented.
https://issues.apache.org/jira/browse/IGNITE-19087
- scheduleChangePeers(configuration, term);
+ scheduleChangePeersAndLearners(configuration, term);
}
} finally {
busyLock.leaveBusy();
@@ -288,7 +288,7 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
* @param peersAndLearners Peers and learners.
* @param term Current known leader term.
*/
- private void scheduleChangePeers(PeersAndLearners peersAndLearners, long
term) {
+ private void scheduleChangePeersAndLearners(PeersAndLearners
peersAndLearners, long term) {
rebalanceScheduler.schedule(() -> {
if (!busyLock.enterBusy()) {
return;
diff --git a/modules/distribution-zones/tech-notes/images/primaryReplica.svg
b/modules/distribution-zones/tech-notes/images/primaryReplica.svg
index d3c1ab27b0..ae7032647d 100644
--- a/modules/distribution-zones/tech-notes/images/primaryReplica.svg
+++ b/modules/distribution-zones/tech-notes/images/primaryReplica.svg
@@ -1 +1 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?><svg
xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink"
contentStyleType="text/css" height="480px" preserveAspectRatio="none"
style="width:1400px;height:480px;background:#FFFFFF;" version="1.1" viewBox="0
0 1400 480" width="1400px" zoomAndPan="magnify"><defs/><g><text fill="#000000"
font-family="sans-serif" font-size="14" font-weight="bold"
lengthAdjust="spacing" textLength="477" x="461" y="28.5352">PrimaryR [...]
\ No newline at end of file
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><svg
xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink"
contentStyleType="text/css" height="480px" preserveAspectRatio="none"
style="width:1400px;height:480px;background:#FFFFFF;" version="1.1" viewBox="0
0 1400 480" width="1400px" zoomAndPan="magnify"><defs/><g><text fill="#000000"
font-family="sans-serif" font-size="14" font-weight="bold"
lengthAdjust="spacing" textLength="477" x="461" y="28.5352">PrimaryR [...]
\ No newline at end of file
diff --git a/modules/distribution-zones/tech-notes/src/primaryReplica.puml
b/modules/distribution-zones/tech-notes/src/primaryReplica.puml
index 7a658d3a33..c4a99c2b1b 100644
--- a/modules/distribution-zones/tech-notes/src/primaryReplica.puml
+++ b/modules/distribution-zones/tech-notes/src/primaryReplica.puml
@@ -17,7 +17,7 @@ Replica2
Leader for term 2
]
-PrimaryReplica -> Replica1 : Send a changePeersAsync request (node is the
leader at the moment)
+PrimaryReplica -> Replica1 : Send a changePeersAndLearnersAsync request (node
is the leader at the moment)
Replica1 -> Replica1 : Leader was stepped down and new leader election will
start.
Replica2 -> Replica2 : Current node elected as a leader.
Replica2 -> PrimaryReplica : Send a message about the new leader elected. [see
3.1 details]
@@ -28,7 +28,7 @@ because if the PrimaryReplica fails,
PlacementDriver will choose another one
and start rebalance again by himself
end note
-PrimaryReplica -> Replica2 : Send changePeers to the new leader again
+PrimaryReplica -> Replica2 : Send changePeersAndLearners to the new leader
again
Replica2 -> PrimaryReplica : Rebalance done message
PrimaryReplica -> PlacementDriver : Rebalance done message. PD than do other
operations from the general rebalance diagram.
@enduml
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
index 92823ea23b..016f6c0038 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java
@@ -246,7 +246,7 @@ public class MetaStorageLeaderElectionListener implements
LeaderElectionListener
PeersAndLearners newPeerConfiguration =
PeersAndLearners.fromConsistentIds(peers, learners);
// We can't use 'resetLearners' call here because it does
not support empty lists of learners.
- return raftService.changePeersAsync(newPeerConfiguration,
term);
+ return
raftService.changePeersAndLearnersAsync(newPeerConfiguration, term);
})));
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index e1b36dfdf8..d5b7a54510 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -983,7 +983,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
PeersAndLearners newConfiguration =
fromAssignments(pendingAssignments);
- CompletableFuture<Void> voidCompletableFuture
= partGrpSvc.changePeersAsync(newConfiguration,
+ CompletableFuture<Void> voidCompletableFuture
= partGrpSvc.changePeersAndLearnersAsync(newConfiguration,
leaderWithTerm.term()).exceptionally(e
-> {
return null;
});
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
index aa44c7daaa..fa43e1fd0c 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
@@ -128,17 +128,19 @@ public interface RaftGroupService extends
RaftCommandRunner {
CompletableFuture<Void> removePeer(Peer peer);
/**
- * Changes peers of a replication group.
+ * Changes peers and learners of a replication group.
*
* <p>After the future completion methods like {@link #peers()} and {@link
#learners()} can be used to retrieve current members of a
* group.
*
* <p>This operation is executed on a group leader.
*
- * @param peers Peers.
+ * @param peersAndLearners New peers and Learners of the Raft group.
+ * @param term Current known leader term.
+ * If real raft group term will be different - configuration
update will be skipped.
* @return A future.
*/
- CompletableFuture<Void> changePeers(Collection<Peer> peers);
+ CompletableFuture<Void> changePeersAndLearners(PeersAndLearners
peersAndLearners, long term);
/**
* Changes peers and learners of a replication group.
@@ -153,10 +155,10 @@ public interface RaftGroupService extends
RaftCommandRunner {
*
* @param peersAndLearners New peers and Learners of the Raft group.
* @param term Current known leader term.
- * If real raft group term will be different - changePeers
will be skipped.
+ * If real raft group term will be different - configuration
update will be skipped.
* @return A future.
*/
- CompletableFuture<Void> changePeersAsync(PeersAndLearners
peersAndLearners, long term);
+ CompletableFuture<Void> changePeersAndLearnersAsync(PeersAndLearners
peersAndLearners, long term);
/**
* Adds learners (non-voting members).
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
index f8bca004a7..aee4c59795 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
@@ -333,7 +333,8 @@ public class ItLearnersTest extends IgniteAbstractTest {
}
/**
- * Tests adding a new learner using {@link
RaftGroupService#changePeersAsync} to an Ignite node that is already running a
Raft peer.
+ * Tests adding a new learner using {@link
RaftGroupService#changePeersAndLearnersAsync} to an Ignite node that is already
running a
+ * Raft peer.
*/
@Test
void testChangePeersToAddLearnerToSameNodeAsPeer() throws
InterruptedException {
@@ -368,7 +369,7 @@ public class ItLearnersTest extends IgniteAbstractTest {
PeersAndLearners newConfiguration = createConfiguration(followers,
List.of(learner, newLearner));
CompletableFuture<Void> changePeersFuture =
learnerService.thenCompose(s -> s.refreshAndGetLeaderWithTerm()
- .thenCompose(leaderWithTerm ->
s.changePeersAsync(newConfiguration, leaderWithTerm.term())
+ .thenCompose(leaderWithTerm ->
s.changePeersAndLearnersAsync(newConfiguration, leaderWithTerm.term())
));
assertThat(changePeersFuture, willCompleteSuccessfully());
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index b14c0c0156..0d27619a4c 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -154,7 +154,7 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
}
@Test
- public void testChangePeersAsync(TestInfo testInfo) throws
InterruptedException {
+ public void testChangePeersAndLearnersAsync(TestInfo testInfo) throws
InterruptedException {
// Start some new followers.
List<TestNode> newFollowers = List.of(startNode(testInfo),
startNode(testInfo));
@@ -185,7 +185,7 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
CompletableFuture<Void> changePeersFuture =
nodes.get(0).raftGroupService
.thenCompose(service -> service.refreshAndGetLeaderWithTerm()
- .thenCompose(l ->
service.changePeersAsync(configuration, l.term()))
+ .thenCompose(l ->
service.changePeersAndLearnersAsync(configuration, l.term()))
);
assertThat(changePeersFuture, willCompleteSuccessfully());
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
index 310607d33c..682e65120e 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
@@ -337,7 +337,7 @@ public class ItCliServiceTest extends
BaseIgniteAbstractTest {
}
@Test
- public void testChangePeers(TestInfo testInfo) throws Exception {
+ public void testChangePeersAndLearners(TestInfo testInfo) throws Exception
{
List<TestPeer> newPeers = TestUtils.generatePeers(testInfo, 6);
newPeers.removeIf(p -> conf.getPeerSet().contains(p.getPeerId()));
@@ -350,7 +350,14 @@ public class ItCliServiceTest extends
BaseIgniteAbstractTest {
assertNotNull(oldLeaderNode);
PeerId oldLeader = oldLeaderNode.getNodeId().getPeerId();
assertNotNull(oldLeader);
- Status status = cliService.changePeers(groupId, conf, new
Configuration(newPeers.stream().map(TestPeer::getPeerId).collect(toList())));
+
+ Status status = cliService.changePeersAndLearners(
+ groupId,
+ conf,
+ new
Configuration(newPeers.stream().map(TestPeer::getPeerId).collect(toList())),
+ oldLeaderNode.getCurrentTerm()
+ );
+
assertTrue(status.isOk(), status.getErrorMsg());
PeerId newLeader = cluster.waitAndGetLeader().getNodeId().getPeerId();
assertNotEquals(oldLeader, newLeader);
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 54d62b453b..97902de1ec 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -145,6 +145,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
public class ItNodeTest extends BaseIgniteAbstractTest {
private static final IgniteLogger log = Loggers.forClass(ItNodeTest.class);
+ private static final long MISMATCHED_TERM = 10_000;
+
private static DumpThread dumpThread;
private static class DumpThread extends Thread {
@@ -2911,16 +2913,16 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}
@Test
- public void testChangePeers() throws Exception {
- changePeers(false);
+ public void testChangePeersAndLearners() throws Exception {
+ changePeersAndLearners(false);
}
@Test
public void testChangeAsyncPeers() throws Exception {
- changePeers(true);
+ changePeersAndLearners(true);
}
- private void changePeers(boolean async) throws Exception {
+ private void changePeersAndLearners(boolean async) throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
cluster = new TestCluster("testChangePeers", dataPath,
Collections.singletonList(peer0), testInfo);
assertTrue(cluster.start(peer0));
@@ -2949,7 +2951,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
PeerId newLeaderPeer = peers.get(i + 1).getPeerId();
if (async) {
SynchronizedClosure done = new SynchronizedClosure();
- leader.changePeersAsync(new
Configuration(Collections.singletonList(newLeaderPeer)),
+ leader.changePeersAndLearnersAsync(new
Configuration(Collections.singletonList(newLeaderPeer)),
leader.getCurrentTerm(), done);
Status status = done.await();
assertTrue(status.isOk(), status.getRaftError().toString());
@@ -2961,7 +2963,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}, 10_000));
} else {
SynchronizedClosure done = new SynchronizedClosure();
- leader.changePeers(new
Configuration(Collections.singletonList(newLeaderPeer)), done);
+ leader.changePeersAndLearners(new
Configuration(Collections.singletonList(newLeaderPeer)),
leader.getCurrentTerm(), done);
Status status = done.await();
assertTrue(status.isOk(), status.getRaftError().toString());
}
@@ -2993,7 +2995,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
SynchronizedClosure done = new SynchronizedClosure();
- leader.changePeersAsync(new
Configuration(Collections.singletonList(newPeer)),
+ leader.changePeersAndLearnersAsync(new
Configuration(Collections.singletonList(newPeer)),
leader.getCurrentTerm(), done);
assertEquals(done.await(), Status.OK());
@@ -3033,7 +3035,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
verify(raftGrpEvtsLsnr, never()).onNewPeersConfigurationApplied(any(),
any());
// Wait until every node sees every other node, otherwise
- // changePeersAsync can fail.
+ // changePeersAndLearnersAsync can fail.
waitForTopologyOnEveryNode(numPeers, cluster);
for (int i = 0; i < 4; i++) {
@@ -3045,7 +3047,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
PeerId newLearner = learners.get(i).getPeerId();
SynchronizedClosure done = new SynchronizedClosure();
- leader.changePeersAsync(new Configuration(List.of(newPeer),
List.of(newLearner)), leader.getCurrentTerm(), done);
+ leader.changePeersAndLearnersAsync(new
Configuration(List.of(newPeer), List.of(newLearner)), leader.getCurrentTerm(),
done);
assertEquals(done.await(), Status.OK());
assertTrue(waitForCondition(() -> {
if (cluster.getLeader() != null) {
@@ -3059,7 +3061,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}
@Test
- public void testChangePeersOnLeaderElected() throws Exception {
+ public void testChangePeersAndLearnersOnLeaderElected() throws Exception {
List<TestPeer> peers = IntStream.range(0, 6)
.mapToObj(i -> new TestPeer(testInfo, TestUtils.INIT_PORT + i))
.collect(toList());
@@ -3092,7 +3094,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}
@Test
- public void changePeersAsyncResponses() throws Exception {
+ public void changePeersAndLearnersAsyncResponses() throws Exception {
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
cluster = new TestCluster("testChangePeers", dataPath,
Collections.singletonList(peer0), testInfo);
assertTrue(cluster.start(peer0));
@@ -3112,19 +3114,19 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
// wrong leader term, do nothing
SynchronizedClosure done = new SynchronizedClosure();
- leader.changePeersAsync(new
Configuration(Collections.singletonList(newLeaderPeer.getPeerId())),
+ leader.changePeersAndLearnersAsync(new
Configuration(Collections.singletonList(newLeaderPeer.getPeerId())),
leader.getCurrentTerm() - 1, done);
assertEquals(done.await(), Status.OK());
// the same config, do nothing
done = new SynchronizedClosure();
- leader.changePeersAsync(new
Configuration(Collections.singletonList(leaderPeer)),
+ leader.changePeersAndLearnersAsync(new
Configuration(Collections.singletonList(leaderPeer)),
leader.getCurrentTerm(), done);
assertEquals(done.await(), Status.OK());
// change peer to new conf containing only new node
done = new SynchronizedClosure();
- leader.changePeersAsync(new
Configuration(Collections.singletonList(newLeaderPeer.getPeerId())),
+ leader.changePeersAndLearnersAsync(new
Configuration(Collections.singletonList(newLeaderPeer.getPeerId())),
leader.getCurrentTerm(), done);
assertEquals(done.await(), Status.OK());
@@ -3152,7 +3154,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
SynchronizedClosure newDone = new SynchronizedClosure();
dones.add(newDone);
futs.add(executor.submit(() -> {
- newLeader.changePeersAsync(new
Configuration(Collections.singletonList(peer0.getPeerId())), 2, newDone);
+ newLeader.changePeersAndLearnersAsync(new
Configuration(Collections.singletonList(peer0.getPeerId())), 2, newDone);
}));
}
futs.get(0).get();
@@ -3177,7 +3179,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}
@Test
- public void testChangePeersAddMultiNodes() throws Exception {
+ public void testChangePeersAndLearnersAddMultiNodes() throws Exception {
List<TestPeer> peers = new ArrayList<>();
TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
@@ -3200,32 +3202,56 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
TestPeer peer = peers.get(1);
// fail, because the peers are not started.
SynchronizedClosure done = new SynchronizedClosure();
- leader.changePeers(new
Configuration(Collections.singletonList(peer.getPeerId())), done);
+ leader.changePeersAndLearners(new
Configuration(Collections.singletonList(peer.getPeerId())),
leader.getCurrentTerm(), done);
assertEquals(RaftError.ECATCHUP, done.await().getRaftError());
// start peer1
assertTrue(cluster.start(peer));
// still fail, because peer2 is not started
done.reset();
- leader.changePeers(conf, done);
+ leader.changePeersAndLearners(conf, leader.getCurrentTerm(), done);
assertEquals(RaftError.ECATCHUP, done.await().getRaftError());
// start peer2
peer = peers.get(2);
assertTrue(cluster.start(peer));
done.reset();
+
+ for (NodeImpl node: cluster.getNodes())
+ assertEquals(node.getConf().getConf().getPeers().size(), 1);
+
// works
- leader.changePeers(conf, done);
+ leader.changePeersAndLearners(conf, leader.getCurrentTerm(), done);
Status await = done.await();
assertTrue(await.isOk(), await.getErrorMsg());
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
+
for (MockStateMachine fsm : cluster.getFsms())
assertEquals(10, fsm.getLogs().size());
+
+ for (NodeImpl node: cluster.getNodes())
+ assertEquals(node.getConf().getConf().getPeers().size(), 3);
+
+ // another attempt to change peers with unmatched term
+ Configuration conf2 = new Configuration(List.of(peer0.getPeerId()));
+ leader.changePeersAndLearners(conf2, MISMATCHED_TERM, done);
+ Status await2 = done.await();
+ assertTrue(await2.isOk(), await.getErrorMsg());
+
+ cluster.ensureSame();
+ assertEquals(3, cluster.getFsms().size());
+
+ for (MockStateMachine fsm : cluster.getFsms())
+ assertEquals(10, fsm.getLogs().size());
+
+ // Verify that configuration wasn't applied because of mismatched term.
+ for (NodeImpl node: cluster.getNodes())
+ assertEquals(node.getConf().getConf().getPeers().size(), 3);
}
@Test
- public void testChangePeersStepsDownInJointConsensus() throws Exception {
+ public void testChangePeersAndLearnersStepsDownInJointConsensus() throws
Exception {
List<TestPeer> peers = new ArrayList<>();
TestPeer peer0 = new TestPeer(testInfo, 5006);
@@ -3258,7 +3284,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
// change peers
SynchronizedClosure done = new SynchronizedClosure();
- leader.changePeers(conf, done);
+ leader.changePeersAndLearners(conf, leader.getCurrentTerm(), done);
assertTrue(done.await().isOk());
// stop peer3
@@ -3269,7 +3295,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
// Change peers to [peer2, peer3], which must fail since peer3 is
stopped
done.reset();
- leader.changePeers(conf, done);
+ leader.changePeersAndLearners(conf, leader.getCurrentTerm(), done);
assertEquals(RaftError.EPERM, done.await().getRaftError());
log.info(done.getStatus().toString());
@@ -3303,7 +3329,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}
- private Future<?> startChangePeersThread(ChangeArg arg) {
+ private Future<?> startChangePeersAndLearnersThread(ChangeArg arg) {
Set<RaftError> expectedErrors = new HashSet<>();
expectedErrors.add(RaftError.EBUSY);
expectedErrors.add(RaftError.EPERM);
@@ -3333,7 +3359,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
continue;
}
SynchronizedClosure done = new SynchronizedClosure();
- leader.changePeers(conf, done);
+ leader.changePeersAndLearners(conf,
leader.getCurrentTerm(), done);
done.await();
assertTrue(done.getStatus().isOk() ||
expectedErrors.contains(done.getStatus().getRaftError()),
done.getStatus().toString());
}
@@ -3345,7 +3371,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}
@Test
- public void testChangePeersChaosWithSnapshot() throws Exception {
+ public void testChangePeersAndLearnersChaosWithSnapshot() throws Exception
{
// start cluster
List<TestPeer> peers = new ArrayList<>();
peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
@@ -3360,7 +3386,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
ChangeArg arg = new ChangeArg(cluster,
peers.stream().map(TestPeer::getPeerId).collect(toList()), false, false);
- Future<?> future = startChangePeersThread(arg);
+ Future<?> future = startChangePeersAndLearnersThread(arg);
for (int i = 0; i < 5000; ) {
Node leader = cluster.waitAndGetLeader();
if (leader == null)
@@ -3380,7 +3406,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
future.get();
SynchronizedClosure done = new SynchronizedClosure();
Node leader = cluster.waitAndGetLeader();
- leader.changePeers(new
Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())), done);
+ leader.changePeersAndLearners(new
Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())),
leader.getCurrentTerm(), done);
Status st = done.await();
assertTrue(st.isOk(), st.getErrorMsg());
cluster.ensureSame();
@@ -3390,7 +3416,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}
@Test
- public void testChangePeersChaosWithoutSnapshot() throws Exception {
+ public void testChangePeersAndLearnersChaosWithoutSnapshot() throws
Exception {
// start cluster
List<TestPeer> peers = new ArrayList<>();
peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
@@ -3405,7 +3431,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
ChangeArg arg = new ChangeArg(cluster,
peers.stream().map(TestPeer::getPeerId).collect(toList()), false, true);
- Future<?> future = startChangePeersThread(arg);
+ Future<?> future = startChangePeersAndLearnersThread(arg);
final int tasks = 5000;
for (int i = 0; i < tasks; ) {
Node leader = cluster.waitAndGetLeader();
@@ -3426,7 +3452,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
future.get();
SynchronizedClosure done = new SynchronizedClosure();
Node leader = cluster.waitAndGetLeader();
- leader.changePeers(new
Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())), done);
+ leader.changePeersAndLearners(new
Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())),
leader.getCurrentTerm(), done);
assertTrue(done.await().isOk());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
@@ -3437,7 +3463,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
}
@Test
- public void testChangePeersChaosApplyTasks() throws Exception {
+ public void testChangePeersAndLearnersChaosApplyTasks() throws Exception {
// start cluster
List<TestPeer> peers = new ArrayList<>();
peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
@@ -3462,7 +3488,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
for (int t = 0; t < threads; t++) {
ChangeArg arg = new ChangeArg(cluster,
peers.stream().map(TestPeer::getPeerId).collect(toList()), false, true);
args.add(arg);
- futures.add(startChangePeersThread(arg));
+ futures.add(startChangePeersAndLearnersThread(arg));
Utils.runInThread(executor, () -> {
try {
@@ -3499,7 +3525,7 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
SynchronizedClosure done = new SynchronizedClosure();
Node leader = cluster.waitAndGetLeader();
- leader.changePeers(new
Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())), done);
+ leader.changePeersAndLearners(new
Configuration(peers.stream().map(TestPeer::getPeerId).collect(toList())),
leader.getCurrentTerm(), done);
assertTrue(done.await().isOk());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 78c287d045..1f04cfdd75 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -25,10 +25,9 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
-import static
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
-import static
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncRequest;
+import static
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncResponse;
+import static
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersRequest;
import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
@@ -72,6 +71,7 @@ import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.ActionRequest;
import org.apache.ignite.raft.jraft.rpc.ActionResponse;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse;
@@ -314,32 +314,40 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
}
@Override
- public CompletableFuture<Void> changePeers(Collection<Peer> peers) {
+ public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners
peersAndLearners, long term) {
Peer leader = this.leader;
if (leader == null) {
- return refreshLeader().thenCompose(res -> changePeers(peers));
+ return refreshLeader().thenCompose(res ->
changePeersAndLearners(peersAndLearners, term));
}
- Function<Peer, ChangePeersRequest> requestFactory = targetPeer ->
factory.changePeersRequest()
+ Function<Peer, ChangePeersAndLearnersRequest> requestFactory =
targetPeer -> factory.changePeersAndLearnersRequest()
.leaderId(peerId(targetPeer))
.groupId(groupId)
- .newPeersList(peerIds(peers))
+ .newPeersList(peerIds(peersAndLearners.peers()))
+ .newLearnersList(peerIds(peersAndLearners.learners()))
+ .term(term)
.build();
- return this.<ChangePeersResponse>sendWithRetry(leader, requestFactory)
- .thenAccept(resp -> this.peers =
parsePeerList(resp.newPeersList()));
+ LOG.info("Sending changePeersAndLearners request for group={} to
peers={} and learners={} with leader term={}",
+ groupId, peersAndLearners.peers(),
peersAndLearners.learners(), term);
+
+ return this.<ChangePeersAndLearnersResponse>sendWithRetry(leader,
requestFactory)
+ .thenAccept(resp -> {
+ this.peers = parsePeerList(resp.newPeersList());
+ this.learners = parsePeerList(resp.newLearnersList());
+ });
}
@Override
- public CompletableFuture<Void> changePeersAsync(PeersAndLearners
peersAndLearners, long term) {
+ public CompletableFuture<Void>
changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term) {
Peer leader = this.leader;
if (leader == null) {
- return refreshLeader().thenCompose(res ->
changePeersAsync(peersAndLearners, term));
+ return refreshLeader().thenCompose(res ->
changePeersAndLearnersAsync(peersAndLearners, term));
}
- Function<Peer, ChangePeersAsyncRequest> requestFactory = targetPeer ->
factory.changePeersAsyncRequest()
+ Function<Peer, ChangePeersAndLearnersAsyncRequest> requestFactory =
targetPeer -> factory.changePeersAndLearnersAsyncRequest()
.leaderId(peerId(targetPeer))
.groupId(groupId)
.term(term)
@@ -347,10 +355,10 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
.newLearnersList(peerIds(peersAndLearners.learners()))
.build();
- LOG.info("Sending changePeersAsync request for group={} to peers={}
and learners={} with leader term={}",
+ LOG.info("Sending changePeersAndLearnersAsync request for group={} to
peers={} and learners={} with leader term={}",
groupId, peersAndLearners.peers(),
peersAndLearners.learners(), term);
- return this.<ChangePeersAsyncResponse>sendWithRetry(leader,
requestFactory)
+ return this.<ChangePeersAndLearnersAsyncResponse>sendWithRetry(leader,
requestFactory)
.thenAccept(resp -> {
// We expect that all raft related errors will be handled
by sendWithRetry, means that
// such responses will initiate a retrying of the original
request.
@@ -636,7 +644,7 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
scheduleRetry(() -> {
// If changing peers or requesting a leader and something
is not found
// probably target peer is doing rebalancing, try another
peer.
- if (sentRequest instanceof GetLeaderRequest || sentRequest
instanceof ChangePeersAsyncRequest) {
+ if (sentRequest instanceof GetLeaderRequest || sentRequest
instanceof ChangePeersAndLearnersAsyncRequest) {
sendWithRetry(randomNode(peer), requestFactory,
stopTime, fut);
} else {
sendWithRetry(peer, requestFactory, stopTime, fut);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
index 1897b0e356..c68f816b0e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/CliService.java
@@ -48,14 +48,20 @@ public interface CliService extends Lifecycle<CliOptions> {
Status removePeer(final String groupId, final Configuration conf, final
PeerId peer);
/**
- * Gracefully change the peers of the replication group.
+ * Gracefully change the peers and learners of the replication group.
*
* @param groupId the raft group id
* @param conf current configuration
- * @param newPeers new peers to change
+ * @param newPeersAndLearners new peers and learners to change
+ * @param term term on which this method was called. If real raft group
term will be different - configuration update will be skipped.
* @return operation status
*/
- Status changePeers(final String groupId, final Configuration conf, final
Configuration newPeers);
+ Status changePeersAndLearners(
+ final String groupId,
+ final Configuration conf,
+ final Configuration newPeersAndLearners,
+ long term
+ );
/**
* Reset the peer set of the target peer.
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
index f2477e3035..3cc5e80dc4 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
@@ -178,10 +178,11 @@ public interface Node extends Lifecycle<NodeOptions>,
Describer {
* Change the configuration of the raft group to |newPeers| , done.run()
would be invoked after this operation
* finishes, describing the detailed result.
*
- * @param newPeers new peers to change
+ * @param newConf new peers and learners configuration to apply.
+ * @param term term on which this method was called. If real raft group
term will be different - configuration update will be skipped.
* @param done callback
*/
- void changePeers(final Configuration newPeers, final Closure done);
+ void changePeersAndLearners(final Configuration newConf, long term, final
Closure done);
/**
* Asynchronously change the configuration of the raft group to
|newPeers|. If done closure was completed with {@link Status#OK()},
@@ -189,10 +190,10 @@ public interface Node extends Lifecycle<NodeOptions>,
Describer {
* {@code STAGE_CATCHING_UP}
*
* @param newConf new peers and learners configuration to apply.
- * @param term term on which this method was called.
+ * @param term term on which this method was called. If real raft group
term will be different - configuration update will be skipped.
* @param done callback
*/
- void changePeersAsync(final Configuration newConf, long term, final
Closure done);
+ void changePeersAndLearnersAsync(final Configuration newConf, long term,
final Closure done);
/**
* Reset the configuration of this node individually, without any
replication to other peers before this node
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
index f96ed467b2..dccd5f8466 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftMessageGroup.java
@@ -47,10 +47,10 @@ public class RaftMessageGroup {
public static final short REMOVE_PEER_RESPONSE = 1003;
/** */
- public static final short CHANGE_PEERS_REQUEST = 1004;
+ public static final short CHANGE_PEERS_AND_LEARNERS_REQUEST = 1004;
/** */
- public static final short CHANGE_PEERS_RESPONSE = 1005;
+ public static final short CHANGE_PEERS_AND_LEARNERS_RESPONSE = 1005;
/** */
public static final short SNAPSHOT_REQUEST = 1006;
@@ -86,10 +86,10 @@ public class RaftMessageGroup {
public static final short LEARNERS_OP_RESPONSE = 1016;
/** */
- public static final short CHANGE_PEERS_ASYNC_REQUEST = 1017;
+ public static final short CHANGE_PEERS_AND_LEARNERS_ASYNC_REQUEST =
1017;
/** */
- public static final short CHANGE_PEERS_ASYNC_RESPONSE = 1018;
+ public static final short CHANGE_PEERS_AND_LEARNERS_ASYNC_RESPONSE =
1018;
/** */
public static final short SUBSCRIPTION_LEADER_CHANGE_REQUEST = 1019;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
index 43d5e58c0a..1acad6326c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
@@ -42,8 +42,8 @@ import org.apache.ignite.raft.jraft.rpc.CliClientService;
import org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersRequest;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersResponse;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
@@ -202,12 +202,17 @@ public class CliServiceImpl implements CliService {
}
}
- // TODO refactor addPeer/removePeer/changePeers/transferLeader, remove
duplicated code IGNITE-14832
+ // TODO refactor addPeer/removePeer/changePeersAndLearners/transferLeader,
remove duplicated code IGNITE-14832
@Override
- public Status changePeers(final String groupId, final Configuration conf,
final Configuration newPeers) {
+ public Status changePeersAndLearners(
+ final String groupId,
+ final Configuration conf,
+ final Configuration newPeersAndLearners,
+ long term
+ ) {
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
Requires.requireNonNull(conf, "Null configuration");
- Requires.requireNonNull(newPeers, "Null new peers");
+ Requires.requireNonNull(newPeersAndLearners, "Null new configuration");
final PeerId leaderId = new PeerId();
final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
@@ -215,17 +220,19 @@ public class CliServiceImpl implements CliService {
return st;
}
- ChangePeersRequest req = cliOptions.getRaftMessagesFactory()
- .changePeersRequest()
+ ChangePeersAndLearnersRequest req = cliOptions.getRaftMessagesFactory()
+ .changePeersAndLearnersRequest()
.groupId(groupId)
.leaderId(leaderId.toString())
-
.newPeersList(newPeers.getPeers().stream().map(Object::toString).collect(toList()))
+
.newPeersList(newPeersAndLearners.getPeers().stream().map(Object::toString).collect(toList()))
+
.newLearnersList(newPeersAndLearners.getLearners().stream().map(Object::toString).collect(toList()))
+ .term(term)
.build();
try {
- final Message result = this.cliClientService.changePeers(leaderId,
req, null).get();
- if (result instanceof ChangePeersResponse) {
- final ChangePeersResponse resp = (ChangePeersResponse) result;
+ final Message result =
this.cliClientService.changePeersAndLearners(leaderId, req, null).get();
+ if (result instanceof ChangePeersAndLearnersResponse) {
+ final ChangePeersAndLearnersResponse resp =
(ChangePeersAndLearnersResponse) result;
recordConfigurationChange(groupId, resp.oldPeersList(),
resp.newPeersList());
return Status.OK();
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 0227293c3e..c4b498cee1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -3371,13 +3371,24 @@ public class NodeImpl implements Node,
RaftServerService {
}
@Override
- public void changePeers(final Configuration newPeers, final Closure done) {
- Requires.requireNonNull(newPeers, "Null new peers");
- Requires.requireTrue(!newPeers.isEmpty(), "Empty new peers");
+ public void changePeersAndLearners(final Configuration
newPeersAndLearners, long term, final Closure done) {
+ Requires.requireNonNull(newPeersAndLearners, "Null new configuration");
+ Requires.requireTrue(!newPeersAndLearners.isEmpty(), "Empty new
configuration");
this.writeLock.lock();
try {
- LOG.info("Node {} change peers from {} to {}.", getNodeId(),
this.conf.getConf(), newPeers);
- unsafeRegisterConfChange(this.conf.getConf(), newPeers, done);
+ long currentTerm = getCurrentTerm();
+
+ if (currentTerm != term) {
+ LOG.warn("Node {} ignored the configuration because of
mismatching terms. Current term is {}, but provided is {}.",
+ getNodeId(), currentTerm, term);
+
+
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done,
Status.OK());
+
+ return;
+ }
+
+ LOG.info("Node {} change configuration from {} to {}.",
getNodeId(), this.conf.getConf(), newPeersAndLearners);
+ unsafeRegisterConfChange(this.conf.getConf(), newPeersAndLearners,
done);
}
finally {
this.writeLock.unlock();
@@ -3385,15 +3396,15 @@ public class NodeImpl implements Node,
RaftServerService {
}
@Override
- public void changePeersAsync(final Configuration newConf, long term,
Closure done) {
- Requires.requireNonNull(newConf, "Null new peers");
- Requires.requireTrue(!newConf.isEmpty(), "Empty new peers");
+ public void changePeersAndLearnersAsync(final Configuration newConf, long
term, Closure done) {
+ Requires.requireNonNull(newConf, "Null new configuration");
+ Requires.requireTrue(!newConf.isEmpty(), "Empty new configuration");
this.writeLock.lock();
try {
long currentTerm = getCurrentTerm();
if (currentTerm != term) {
- LOG.warn("Node {} refused configuration because of mismatching
terms. Current term is {}, but provided is {}.",
+ LOG.warn("Node {} ignored the configuration because of
mismatching terms. Current term is {}, but provided is {}.",
getNodeId(), currentTerm, term);
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done,
Status.OK());
@@ -3401,7 +3412,7 @@ public class NodeImpl implements Node, RaftServerService {
return;
}
- LOG.info("Node {} change peers from {} to {}.", getNodeId(),
this.conf.getConf(), newConf);
+ LOG.info("Node {} change configuration from {} to {}.",
getNodeId(), this.conf.getConf(), newConf);
unsafeRegisterConfChange(this.conf.getConf(), newConf, done, true);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliClientService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliClientService.java
index 3c995d6f70..c053f3ba24 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliClientService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliClientService.java
@@ -17,7 +17,7 @@
package org.apache.ignite.raft.jraft.rpc;
import java.util.concurrent.Future;
-import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.PeerId;import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersRequest;import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersResponse;
/**
* Cli RPC client service.
@@ -69,15 +69,15 @@ public interface CliClientService extends ClientService {
RpcResponseClosure<RpcRequests.ErrorResponse> done);
/**
- * Change peers.
+ * Change peers and learners.
*
* @param peerId peer ID
* @param request request data
* @param done callback
* @return a future with result
*/
- Future<Message> changePeers(PeerId peerId, CliRequests.ChangePeersRequest
request,
- RpcResponseClosure<CliRequests.ChangePeersResponse> done);
+ Future<Message> changePeersAndLearners(PeerId peerId,
ChangePeersAndLearnersRequest request,
+ RpcResponseClosure<ChangePeersAndLearnersResponse> done);
/**
* Add learners
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java
index 1e1c54e9b1..7bb3929c9d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java
@@ -62,25 +62,35 @@ public final class CliRequests {
Collection<String> newPeersList();
}
- @Transferable(value =
RaftMessageGroup.RpcClientMessageGroup.CHANGE_PEERS_REQUEST)
- public interface ChangePeersRequest extends Message {
+ @Transferable(value =
RaftMessageGroup.RpcClientMessageGroup.CHANGE_PEERS_AND_LEARNERS_REQUEST)
+ public interface ChangePeersAndLearnersRequest extends Message {
String groupId();
String leaderId();
Collection<String> newPeersList();
+
+ Collection<String> newLearnersList();
+
+ // term is intentionally Long and not long in order to perform
nullable (not initialized) check.
+ Long term();
}
- @Transferable(value =
RaftMessageGroup.RpcClientMessageGroup.CHANGE_PEERS_RESPONSE)
- public interface ChangePeersResponse extends Message {
+ @Transferable(value =
RaftMessageGroup.RpcClientMessageGroup.CHANGE_PEERS_AND_LEARNERS_RESPONSE)
+ public interface ChangePeersAndLearnersResponse extends Message {
@Nullable
Collection<String> oldPeersList();
Collection<String> newPeersList();
+
+ @Nullable
+ Collection<String> oldLearnersList();
+
+ Collection<String> newLearnersList();
}
- @Transferable(value = RpcClientMessageGroup.CHANGE_PEERS_ASYNC_REQUEST)
- public interface ChangePeersAsyncRequest extends Message {
+ @Transferable(value =
RpcClientMessageGroup.CHANGE_PEERS_AND_LEARNERS_ASYNC_REQUEST)
+ public interface ChangePeersAndLearnersAsyncRequest extends Message {
String groupId();
String leaderId();
@@ -89,11 +99,12 @@ public final class CliRequests {
Collection<String> newLearnersList();
- long term();
+ // term is intentionally Long and not long in order to perform
nullable (not initialized) check.
+ Long term();
}
- @Transferable(value = RpcClientMessageGroup.CHANGE_PEERS_ASYNC_RESPONSE)
- public interface ChangePeersAsyncResponse extends Message {
+ @Transferable(value =
RpcClientMessageGroup.CHANGE_PEERS_AND_LEARNERS_ASYNC_RESPONSE)
+ public interface ChangePeersAndLearnersAsyncResponse extends Message {
Collection<String> oldPeersList();
Collection<String> newPeersList();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 8898432525..6a676223d0 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -40,8 +40,8 @@ import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
import org.apache.ignite.raft.jraft.rpc.RpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.cli.AddLearnersRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.AddPeerRequestProcessor;
-import
org.apache.ignite.raft.jraft.rpc.impl.cli.ChangePeersAsyncRequestProcessor;
-import org.apache.ignite.raft.jraft.rpc.impl.cli.ChangePeersRequestProcessor;
+import
org.apache.ignite.raft.jraft.rpc.impl.cli.ChangePeersAndLearnersAsyncRequestProcessor;
+import
org.apache.ignite.raft.jraft.rpc.impl.cli.ChangePeersAndLearnersRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.GetLeaderRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.impl.cli.GetPeersRequestProcessor;
import
org.apache.ignite.raft.jraft.rpc.impl.cli.RemoveLearnersRequestProcessor;
@@ -112,8 +112,8 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new AddPeerRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new RemovePeerRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new ResetPeerRequestProcessor(rpcExecutor,
raftMessagesFactory));
- registerProcessor(new ChangePeersRequestProcessor(rpcExecutor,
raftMessagesFactory));
- registerProcessor(new ChangePeersAsyncRequestProcessor(rpcExecutor,
raftMessagesFactory));
+ registerProcessor(new
ChangePeersAndLearnersRequestProcessor(rpcExecutor, raftMessagesFactory));
+ registerProcessor(new
ChangePeersAndLearnersAsyncRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new GetLeaderRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new SnapshotRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new TransferLeaderRequestProcessor(rpcExecutor,
raftMessagesFactory));
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAsyncRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersAsyncRequestProcessor.java
similarity index 79%
rename from
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAsyncRequestProcessor.java
rename to
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersAsyncRequestProcessor.java
index c6c3bff078..a3b9e19978 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAsyncRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersAsyncRequestProcessor.java
@@ -23,8 +23,8 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncResponse;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncRequest;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncResponse;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
@@ -33,24 +33,24 @@ import static java.util.stream.Collectors.toList;
/**
* Change peers request processor.
*/
-public class ChangePeersAsyncRequestProcessor extends
BaseCliRequestProcessor<ChangePeersAsyncRequest> {
+public class ChangePeersAndLearnersAsyncRequestProcessor extends
BaseCliRequestProcessor<ChangePeersAndLearnersAsyncRequest> {
- public ChangePeersAsyncRequestProcessor(Executor executor,
RaftMessagesFactory msgFactory) {
+ public ChangePeersAndLearnersAsyncRequestProcessor(Executor executor,
RaftMessagesFactory msgFactory) {
super(executor, msgFactory);
}
@Override
- protected String getPeerId(final ChangePeersAsyncRequest request) {
+ protected String getPeerId(final ChangePeersAndLearnersAsyncRequest
request) {
return request.leaderId();
}
@Override
- protected String getGroupId(final ChangePeersAsyncRequest request) {
+ protected String getGroupId(final ChangePeersAndLearnersAsyncRequest
request) {
return request.groupId();
}
@Override
- protected Message processRequest0(final CliRequestContext ctx, final
ChangePeersAsyncRequest request,
+ protected Message processRequest0(final CliRequestContext ctx, final
ChangePeersAndLearnersAsyncRequest request,
final IgniteCliRpcRequestClosure done) {
final List<PeerId> oldPeers = ctx.node.listPeers();
final List<PeerId> oldLearners = ctx.node.listLearners();
@@ -80,15 +80,15 @@ public class ChangePeersAsyncRequestProcessor extends
BaseCliRequestProcessor<Ch
long term = request.term();
- LOG.info("Receive ChangePeersAsyncRequest with term {} to {} from {},
new conf is {}", term, ctx.node.getNodeId(), done.getRpcCtx()
+ LOG.info("Receive ChangePeersAndLearnersAsyncRequest with term {} to
{} from {}, new conf is {}", term, ctx.node.getNodeId(), done.getRpcCtx()
.getRemoteAddress(), conf);
- ctx.node.changePeersAsync(conf, term, status -> {
+ ctx.node.changePeersAndLearnersAsync(conf, term, status -> {
if (!status.isOk()) {
done.run(status);
}
else {
- ChangePeersAsyncResponse resp =
msgFactory().changePeersAsyncResponse()
+ ChangePeersAndLearnersAsyncResponse resp =
msgFactory().changePeersAndLearnersAsyncResponse()
.oldPeersList(toStringList(oldPeers))
.newPeersList(toStringList(conf.getPeers()))
.oldLearnersList(toStringList(oldLearners))
@@ -107,6 +107,6 @@ public class ChangePeersAsyncRequestProcessor extends
BaseCliRequestProcessor<Ch
@Override
public String interest() {
- return ChangePeersAsyncRequest.class.getName();
+ return ChangePeersAndLearnersAsyncRequest.class.getName();
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersRequestProcessor.java
similarity index 53%
rename from
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessor.java
rename to
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersRequestProcessor.java
index 417838a75b..38c9a43e41 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersRequestProcessor.java
@@ -16,14 +16,15 @@
*/
package org.apache.ignite.raft.jraft.rpc.impl.cli;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersRequest;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersResponse;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
@@ -32,26 +33,27 @@ import static java.util.stream.Collectors.toList;
/**
* Change peers request processor.
*/
-public class ChangePeersRequestProcessor extends
BaseCliRequestProcessor<ChangePeersRequest> {
+public class ChangePeersAndLearnersRequestProcessor extends
BaseCliRequestProcessor<ChangePeersAndLearnersRequest> {
- public ChangePeersRequestProcessor(Executor executor, RaftMessagesFactory
msgFactory) {
+ public ChangePeersAndLearnersRequestProcessor(Executor executor,
RaftMessagesFactory msgFactory) {
super(executor, msgFactory);
}
@Override
- protected String getPeerId(final ChangePeersRequest request) {
+ protected String getPeerId(final ChangePeersAndLearnersRequest request) {
return request.leaderId();
}
@Override
- protected String getGroupId(final ChangePeersRequest request) {
+ protected String getGroupId(final ChangePeersAndLearnersRequest request) {
return request.groupId();
}
@Override
- protected Message processRequest0(final CliRequestContext ctx, final
ChangePeersRequest request,
+ protected Message processRequest0(final CliRequestContext ctx, final
ChangePeersAndLearnersRequest request,
final IgniteCliRpcRequestClosure done) {
- final List<PeerId> oldConf = ctx.node.listPeers();
+ final List<PeerId> oldPeers = ctx.node.listPeers();
+ final List<PeerId> oldLearners = ctx.node.listLearners();
final Configuration conf = new Configuration();
for (final String peerIdStr : request.newPeersList()) {
@@ -64,16 +66,33 @@ public class ChangePeersRequestProcessor extends
BaseCliRequestProcessor<ChangeP
.newResponse(msgFactory(), RaftError.EINVAL, "Fail to
parse peer id %s", peerIdStr);
}
}
- LOG.info("Receive ChangePeersRequest to {} from {}, new conf is {}",
ctx.node.getNodeId(), done.getRpcCtx()
- .getRemoteAddress(), conf);
- ctx.node.changePeers(conf, status -> {
+
+ for (final String learnerIdStr : request.newLearnersList()) {
+ final PeerId learner = new PeerId();
+ if (learner.parse(learnerIdStr)) {
+ conf.addLearner(learner);
+ }
+ else {
+ return RaftRpcFactory.DEFAULT //
+ .newResponse(msgFactory(), RaftError.EINVAL, "Fail to
parse learner id %s", learnerIdStr);
+ }
+ }
+
+ long term = request.term();
+
+ LOG.info("Receive ChangePeersAndLearnersRequest with term {} to {}
from {}, new conf is {}", term, ctx.node.getNodeId(),
+ done.getRpcCtx().getRemoteAddress(), conf);
+
+ ctx.node.changePeersAndLearners(conf, term, status -> {
if (!status.isOk()) {
done.run(status);
}
else {
- ChangePeersResponse req = msgFactory().changePeersResponse()
-
.oldPeersList(oldConf.stream().map(Object::toString).collect(toList()))
-
.newPeersList(conf.getPeers().stream().map(Object::toString).collect(toList()))
+ ChangePeersAndLearnersResponse req =
msgFactory().changePeersAndLearnersResponse()
+ .oldPeersList(toStringList(oldPeers))
+ .newPeersList(toStringList(conf.getPeers()))
+ .oldLearnersList(toStringList(oldLearners))
+ .newLearnersList(toStringList(conf.getLearners()))
.build();
done.sendResponse(req);
@@ -82,8 +101,12 @@ public class ChangePeersRequestProcessor extends
BaseCliRequestProcessor<ChangeP
return null;
}
+ private static List<String> toStringList(Collection<?> collection) {
+ return collection.stream().map(Object::toString).collect(toList());
+ }
+
@Override
public String interest() {
- return ChangePeersRequest.class.getName();
+ return ChangePeersAndLearnersRequest.class.getName();
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/CliClientServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/CliClientServiceImpl.java
index e12881a6f1..5982191e4f 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/CliClientServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/CliClientServiceImpl.java
@@ -25,8 +25,8 @@ import org.apache.ignite.raft.jraft.rpc.CliRequests;
import org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersRequest;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersResponse;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
import org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
@@ -83,8 +83,8 @@ public class CliClientServiceImpl extends
AbstractClientService implements CliCl
}
@Override
- public Future<Message> changePeers(final PeerId peerId, final
ChangePeersRequest request,
- final RpcResponseClosure<ChangePeersResponse> done) {
+ public Future<Message> changePeersAndLearners(final PeerId peerId, final
ChangePeersAndLearnersRequest request,
+ final RpcResponseClosure<ChangePeersAndLearnersResponse> done) {
return invokeWithDone(peerId, request, done,
this.cliOptions.getTimeoutMs());
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index f1b8573b58..453d256fd9 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft;
+import static java.util.Collections.emptyList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toUnmodifiableList;
@@ -62,6 +63,7 @@ import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.util.OptimizedMarshaller;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
@@ -80,7 +82,7 @@ import org.apache.ignite.raft.jraft.rpc.ActionRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests;
import org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
@@ -112,6 +114,10 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
.map(port -> new Peer("localhost-" + port))
.collect(toUnmodifiableList());
+ private static final List<Peer> NODES_FOR_LEARNERS = Stream.of(20003,
20004, 20005)
+ .map(port -> new Peer("localhost-" + port))
+ .collect(toUnmodifiableList());
+
private static final RaftMessagesFactory FACTORY = new
RaftMessagesFactory();
private volatile @Nullable Peer leader = NODES.get(0);
@@ -436,14 +442,29 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
}
@Test
- public void testChangePeers() {
+ public void testChangePeersAndLearners() throws Exception {
List<String> shrunkPeers = peersToIds(NODES.subList(0, 1));
List<String> extendedPeers = peersToIds(NODES);
- when(messagingService.invoke(any(ClusterNode.class),
any(ChangePeersRequest.class), anyLong()))
- .then(invocation ->
completedFuture(FACTORY.changePeersResponse().newPeersList(shrunkPeers).build()))
- .then(invocation ->
completedFuture(FACTORY.changePeersResponse().newPeersList(extendedPeers).build()));
+ List<String> fullLearners = peersToIds(NODES_FOR_LEARNERS);
+
+ when(messagingService.invoke(any(ClusterNode.class),
any(ChangePeersAndLearnersRequest.class), anyLong()))
+ .then(invocation ->
completedFuture(FACTORY.changePeersAndLearnersResponse()
+ .newPeersList(shrunkPeers)
+ .newLearnersList(emptyList())
+ .build()
+ ))
+ .then(invocation ->
completedFuture(FACTORY.changePeersAndLearnersResponse()
+ .newPeersList(extendedPeers)
+ .newLearnersList(emptyList())
+ .build()
+ ))
+ .then(invocation ->
completedFuture(FACTORY.changePeersAndLearnersResponse()
+ .newPeersList(shrunkPeers)
+ .newLearnersList(fullLearners)
+ .build()
+ ));
mockLeaderRequest(false);
@@ -452,15 +473,30 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
assertThat(service.peers(), containsInAnyOrder(NODES.subList(0,
2).toArray()));
assertThat(service.learners(), is(empty()));
- assertThat(service.changePeers(NODES.subList(0, 1)),
willCompleteSuccessfully());
+ CompletableFuture<LeaderWithTerm> leaderWithTermFuture =
service.refreshAndGetLeaderWithTerm();
+ assertThat(leaderWithTermFuture, willCompleteSuccessfully());
+ LeaderWithTerm leaderWithTerm = leaderWithTermFuture.get();
+
+ // Peers[0, 1], Learners [empty]
+ PeersAndLearners configuration =
PeersAndLearners.fromPeers(NODES.subList(0, 1), emptyList());
+ assertThat(service.changePeersAndLearners(configuration,
leaderWithTerm.term()), willCompleteSuccessfully());
assertThat(service.peers(), containsInAnyOrder(NODES.subList(0,
1).toArray()));
assertThat(service.learners(), is(empty()));
- assertThat(service.changePeers(NODES), willCompleteSuccessfully());
+ // Peers[0, 1, 2], Learners [empty]
+ PeersAndLearners configuration2 = PeersAndLearners.fromPeers(NODES,
emptyList());
+ assertThat(service.changePeersAndLearners(configuration2,
leaderWithTerm.term()), willCompleteSuccessfully());
assertThat(service.peers(), containsInAnyOrder(NODES.toArray()));
assertThat(service.learners(), is(empty()));
+
+ // Peers[0, 1], Learners [3, 4, 5]
+ PeersAndLearners configuration3 =
PeersAndLearners.fromPeers(NODES.subList(0, 1), NODES_FOR_LEARNERS);
+ assertThat(service.changePeersAndLearners(configuration3,
leaderWithTerm.term()), willCompleteSuccessfully());
+
+ assertThat(service.peers(), containsInAnyOrder(NODES.subList(0,
1).toArray()));
+ assertThat(service.learners(),
containsInAnyOrder(NODES_FOR_LEARNERS.toArray()));
}
@Test
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAsyncRequestProcessorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersAsyncRequestProcessorTest.java
similarity index 76%
rename from
modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAsyncRequestProcessorTest.java
rename to
modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersAsyncRequestProcessorTest.java
index 9edabd97e7..c8c22d7892 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAsyncRequestProcessorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersAsyncRequestProcessorTest.java
@@ -26,46 +26,46 @@ import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncResponse;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncRequest;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncResponse;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-public class ChangePeersAsyncRequestProcessorTest extends
AbstractCliRequestProcessorTest<ChangePeersAsyncRequest>{
+public class ChangePeersAndLearnersAsyncRequestProcessorTest extends
AbstractCliRequestProcessorTest<ChangePeersAndLearnersAsyncRequest>{
private static final List<String> PEERS = List.of("follower1",
"follower2");
private static final List<String> LEARNERS = List.of("learner1",
"learner2", "learner3");
@Override
- public ChangePeersAsyncRequest createRequest(String groupId, PeerId
peerId) {
- return msgFactory.changePeersAsyncRequest()
+ public ChangePeersAndLearnersAsyncRequest createRequest(String groupId,
PeerId peerId) {
+ return msgFactory.changePeersAndLearnersAsyncRequest()
.groupId(groupId)
.leaderId(peerId.toString())
.newPeersList(PEERS)
.newLearnersList(LEARNERS)
- .term(1)
+ .term(1L)
.build();
}
@Override
- public BaseCliRequestProcessor<ChangePeersAsyncRequest> newProcessor() {
- return new ChangePeersAsyncRequestProcessor(null, msgFactory);
+ public BaseCliRequestProcessor<ChangePeersAndLearnersAsyncRequest>
newProcessor() {
+ return new ChangePeersAndLearnersAsyncRequestProcessor(null,
msgFactory);
}
@Override
public void verify(String interest, Node node, ArgumentCaptor<Closure>
doneArg) {
- assertEquals(ChangePeersAsyncRequest.class.getName(), interest);
+ assertEquals(ChangePeersAndLearnersAsyncRequest.class.getName(),
interest);
Configuration expectedConf = new Configuration();
PEERS.stream().map(PeerId::parsePeer).forEach(expectedConf::addPeer);
LEARNERS.stream().map(PeerId::parsePeer).forEach(expectedConf::addLearner);
- Mockito.verify(node).changePeersAsync(eq(expectedConf), eq(1L),
doneArg.capture());
+ Mockito.verify(node).changePeersAndLearnersAsync(eq(expectedConf),
eq(1L), doneArg.capture());
Closure done = doneArg.getValue();
assertNotNull(done);
done.run(Status.OK());
assertNotNull(this.asyncContext.getResponseObject());
- ChangePeersAsyncResponse response =
this.asyncContext.as(ChangePeersAsyncResponse.class);
+ ChangePeersAndLearnersAsyncResponse response =
this.asyncContext.as(ChangePeersAndLearnersAsyncResponse.class);
assertEquals(List.of("localhost:8081", "localhost:8082",
"localhost:8083"), response.oldPeersList());
assertEquals(PEERS, response.newPeersList());
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessorTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersRequestProcessorTest.java
similarity index 56%
rename from
modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessorTest.java
rename to
modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersRequestProcessorTest.java
index 30870d7d13..0fb1af3b5e 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersRequestProcessorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/impl/cli/ChangePeersAndLearnersRequestProcessorTest.java
@@ -22,8 +22,8 @@ import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersRequest;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersResponse;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@@ -31,35 +31,47 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.eq;
-public class ChangePeersRequestProcessorTest extends
AbstractCliRequestProcessorTest<ChangePeersRequest> {
+public class ChangePeersAndLearnersRequestProcessorTest extends
AbstractCliRequestProcessorTest<ChangePeersAndLearnersRequest> {
+ private static final long CURRENT_TERM = 1L;
@Override
- public ChangePeersRequest createRequest(String groupId, PeerId peerId) {
- return msgFactory.changePeersRequest()
+ public ChangePeersAndLearnersRequest createRequest(String groupId, PeerId
peerId) {
+ return msgFactory.changePeersAndLearnersRequest()
.groupId(groupId)
.leaderId(peerId.toString())
.newPeersList(List.of("localhost:8084", "localhost:8085"))
+ .newLearnersList(List.of("localhost:8086", "localhost:8087"))
+ .term(CURRENT_TERM)
.build();
}
@Override
- public BaseCliRequestProcessor<ChangePeersRequest> newProcessor() {
- return new ChangePeersRequestProcessor(null, msgFactory);
+ public BaseCliRequestProcessor<ChangePeersAndLearnersRequest>
newProcessor() {
+ return new ChangePeersAndLearnersRequestProcessor(null, msgFactory);
}
@Override
public void verify(String interest, Node node, ArgumentCaptor<Closure>
doneArg) {
- assertEquals(ChangePeersRequest.class.getName(), interest);
-
Mockito.verify(node).changePeers(eq(JRaftUtils.getConfiguration("localhost:8084,localhost:8085")),
- doneArg.capture());
+ assertEquals(ChangePeersAndLearnersRequest.class.getName(), interest);
+
+ Mockito.verify(node).changePeersAndLearners(
+
eq(JRaftUtils.getConfiguration("localhost:8084,localhost:8085,localhost:8086/learner,localhost:8087/learner")),
+ eq(CURRENT_TERM),
+ doneArg.capture()
+ );
+
Closure done = doneArg.getValue();
assertNotNull(done);
done.run(Status.OK());
assertNotNull(this.asyncContext.getResponseObject());
assertEquals("[localhost:8081, localhost:8082, localhost:8083]",
this.asyncContext
- .as(ChangePeersResponse.class).oldPeersList().toString());
- assertEquals("[localhost:8084, localhost:8085]",
this.asyncContext.as(ChangePeersResponse.class)
+
.as(ChangePeersAndLearnersResponse.class).oldPeersList().toString());
+ assertEquals("[localhost:8084, localhost:8085]",
this.asyncContext.as(ChangePeersAndLearnersResponse.class)
.newPeersList().toString());
+ assertEquals("[learner:8081, learner:8082, learner:8083]",
this.asyncContext
+
.as(ChangePeersAndLearnersResponse.class).oldLearnersList().toString());
+ assertEquals("[localhost:8086, localhost:8087]",
this.asyncContext.as(ChangePeersAndLearnersResponse.class)
+ .newLearnersList().toString());
}
}
diff --git a/modules/raft/tech-notes/changePeers.md
b/modules/raft/tech-notes/changePeersAndLearners.md
similarity index 80%
rename from modules/raft/tech-notes/changePeers.md
rename to modules/raft/tech-notes/changePeersAndLearners.md
index bad3b1c41e..5810c9ec36 100644
--- a/modules/raft/tech-notes/changePeers.md
+++ b/modules/raft/tech-notes/changePeersAndLearners.md
@@ -1,11 +1,11 @@
## Introduction
-ChangePeers is not a separate jraft log command, but an algorithm with some
separate phases
+ChangePeersAndLearners is not a separate jraft log command, but an algorithm
with some separate phases
- Start replicators for new nodes on the leader and waiting for nodes' catchup
- Push configuration LogEntry to raft quorum (old quorum, new quorum) and
apply ConfigurationEntry(conf=\<new peers configuration>, oldConf=\<previous
peers configuration>) to leader.
- When previous configuration committed by quorum, push configuration LogEntry
to raft quorum (only new quorum) and apply configuration
ConfigurationEntry(conf=\<new peers configuration>, oldConf=null) to leader.
Legacy replicators will be stopped and if current leader is not in new topology
- it will be stepped down.
## Catchup phase (STAGE_CATCHING_UP)
-On changePeers request leader start all needed replicators for new peers.
NodeImpl#confCtx stage set to STAGE_CATCHING_UP. We will use these stages as
logical step names for further process explanation.
+On changePeersAndLearners request leader start all needed replicators for new
peers. NodeImpl#confCtx stage set to STAGE_CATCHING_UP. We will use these
stages as logical step names for further process explanation.
The end of catchup phase - is the moment, when all new peers caught up the
leader. It means that a difference between leader.last_log_index and
peer.last_log_index is smaller than NodeOptions#catchupMargin (1000 by default).
@@ -27,16 +27,16 @@ Change configuration of the leader to the new peers only.
When new quorum accept new config - stop legacy replicators and step down the
leader if it was removed from the new configuration.
-After that - changePeers finished and client receive response.
+After that - changePeersAndLearners finished and client receive response.
## Questions
>Is it possible to change peers for the case when the old and new sets of raft
>nodes do not intersect?
Yes, according to algorithm it is not an issue.
->When changePeers() returns to the client?
+>When changePeersAndLearners() returns to the client?
-In the end of the whole process (including data migration) and it looks like
it is not a problem of algorithm at all, but the problem of
ChangePeersRequestProcessor from cli package. The group will be fully workin
during the longest phase of changePeers - data migration. So, maybe we need
just async version of ChangePeerRequestProcessor.
+In the end of the whole process (including data migration) and it looks like
it is not a problem of algorithm at all, but the problem of
ChangePeersAndLearnersRequestProcessor from cli package. The group will be
fully workin during the longest phase of changePeersAndLearners - data
migration. So, maybe we need just async version of ChangePeerRequestProcessor.
>Let’s check whether dataRebalance is a raft command that works just as any
>other raft commands and do not expect index gaps.
diff --git a/modules/raft/tech-notes/nodeCatchUp.md
b/modules/raft/tech-notes/nodeCatchUp.md
index da8d9be7a9..8828469dfd 100644
--- a/modules/raft/tech-notes/nodeCatchUp.md
+++ b/modules/raft/tech-notes/nodeCatchUp.md
@@ -4,7 +4,7 @@
We have a closure named `NodeImpl.OnCaughtUp`, which is responsible for the
catching up process for every stale node/replicator on a leader.
This closure is created every time we call
`NodeImpl.ConfigurationCtx#addNewPeers` which happens on a raft configuration
change, for example
-when we call `NodeImpl#changePeers`. In
`NodeImpl.ConfigurationCtx#addNewPeers` method we assign `OnCaughtUp` closure
with a corresponding
+when we call `NodeImpl#changePeerAndLearners`. In
`NodeImpl.ConfigurationCtx#addNewPeers` method we assign `OnCaughtUp` closure
with a corresponding
replicator for a stale node. This is done inside
`ReplicatorGroupImpl#waitCaughtUp`
by calling `Replicator#waitForCaughtUp`. To be more precise, we save the
closure in a field `Replicator#catchUpClosure` and also we schedule
timer on a replicator to call `Replicator#onCatchUpTimedOut` (by default it is
called after election timeout).
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 015a829e90..929f65496e 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -419,13 +419,13 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
}
@Override
- public CompletableFuture<Void> changePeers(Collection<Peer> peers) {
- return raftClient.changePeers(peers);
+ public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners
peersAndLearners, long term) {
+ return raftClient.changePeersAndLearners(peersAndLearners, term);
}
@Override
- public CompletableFuture<Void> changePeersAsync(PeersAndLearners
peersAndLearners, long term) {
- return raftClient.changePeersAsync(peersAndLearners, term);
+ public CompletableFuture<Void>
changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term) {
+ return raftClient.changePeersAndLearnersAsync(peersAndLearners, term);
}
@Override
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 799c6d1f53..b61fb76b24 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -221,7 +221,7 @@ import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.IgniteSql;
@@ -768,7 +768,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
// Using this hack we pause rebalance on all nodes
nodes.forEach(n -> ((DefaultMessagingService)
n.clusterService.messagingService())
- .dropMessages((nodeName, msg) -> msg instanceof
ChangePeersAsyncRequest && dropMessages.get())
+ .dropMessages((nodeName, msg) -> msg instanceof
ChangePeersAndLearnersAsyncRequest && dropMessages.get())
);
node.metaStorageManager.put(partAssignmentsPendingKey,
bytesPendingAssignments).get(AWAIT_TIMEOUT_MILLIS, MILLISECONDS);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 992dd604be..9c7d7f5114 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -56,7 +56,7 @@ import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncRequest;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.AfterEach;
@@ -188,7 +188,7 @@ public class ItRebalanceTest extends BaseIgniteAbstractTest
{
AtomicBoolean dropMessages = new AtomicBoolean(true);
cluster.runningNodes().forEach(
- n -> n.dropMessages((nodeName, msg) -> msg instanceof
ChangePeersAsyncRequest && dropMessages.get())
+ n -> n.dropMessages((nodeName, msg) -> msg instanceof
ChangePeersAndLearnersAsyncRequest && dropMessages.get())
);
alterZone(zoneName, 2);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
index a0361ca4ab..cde2494b27 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
@@ -102,7 +102,8 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
assertFalse(containsPartition(cluster.node(2)));
// By this we guarantee, that there will no any partition data nodes,
which will be available to perform the rebalance.
- // To run the actual changePeersAsync we need the partition leader,
which catch the metastore event about new pending keys.
+ // To run the actual changePeersAndLearnersAsync we need the partition
leader, which catch the metastore event about new pending
+ // keys.
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit();
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit();
@@ -147,7 +148,8 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
assertFalse(containsPartition(cluster.node(2)));
// By this we guarantee, that there will no any partition data nodes,
which will be available to perform the rebalance.
- // To run the actual changePeersAsync we need the partition leader,
which catch the metastore event about new pending keys.
+ // To run the actual changePeersAndLearnersAsync we need the partition
leader, which catch the metastore event about new pending
+ // keys.
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit();
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e2aa8d3e5e..186c74beba 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2217,7 +2217,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
PeersAndLearners newConfiguration =
fromAssignments(pendingAssignments);
- return
partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term());
+ return
partGrpSvc.changePeersAndLearnersAsync(newConfiguration, leaderWithTerm.term());
});
});
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
index ce75a0e26a..7b93ca4ca4 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionMoverTest.java
@@ -56,13 +56,13 @@ class PartitionMoverTest extends BaseIgniteAbstractTest {
);
/**
- * Tests that {@link RaftGroupServiceImpl#changePeersAsync} was retried
after some exceptions.
+ * Tests that {@link RaftGroupServiceImpl#changePeersAndLearnersAsync} was
retried after some exceptions.
*/
@Test
- public void testChangePeersAsyncRetryLogic() {
+ public void testChangePeersAndLearnersAsyncRetryLogic() {
RaftGroupService raftService = mock(RaftGroupService.class);
- when(raftService.changePeersAsync(any(), anyLong()))
+ when(raftService.changePeersAndLearnersAsync(any(), anyLong()))
.thenReturn(failedFuture(new RuntimeException()))
.thenReturn(failedFuture(new IOException()))
.thenReturn(nullCompletedFuture());
@@ -71,7 +71,7 @@ class PartitionMoverTest extends BaseIgniteAbstractTest {
assertThat(partitionMover.movePartition(PEERS_AND_LEARNERS, TERM),
willCompleteSuccessfully());
- verify(raftService, times(3)).changePeersAsync(eq(PEERS_AND_LEARNERS),
eq(TERM));
+ verify(raftService,
times(3)).changePeersAndLearnersAsync(eq(PEERS_AND_LEARNERS), eq(TERM));
}
@Test
@@ -93,7 +93,7 @@ class PartitionMoverTest extends BaseIgniteAbstractTest {
RaftGroupService raftService = mock(RaftGroupService.class);
- when(raftService.changePeersAsync(any(), anyLong()))
+ when(raftService.changePeersAndLearnersAsync(any(), anyLong()))
.then(invocation -> CompletableFuture.runAsync(lock::block));
var partitionMover = new PartitionMover(lock, () ->
completedFuture(raftService));
diff --git a/modules/table/tech-notes/rebalance.md
b/modules/table/tech-notes/rebalance.md
index d0198c8d68..5474c131d6 100644
--- a/modules/table/tech-notes/rebalance.md
+++ b/modules/table/tech-notes/rebalance.md
@@ -67,13 +67,13 @@ metastoreInvoke: // atomic metastore call through
multi-invoke api
**Steps**:
- Start all new needed nodes `partition.assignments.pending /
partition.assignments.stable`
-- After successful starts - check if current node is the leader of raft group
(leader response must be updated by current term) and run
`RaftGroupService#changePeersAsync(leaderTerm, peers)`.
`RaftGroupService#changePeersAsync` from old terms must be skipped.
+- After successful starts - check if current node is the leader of raft group
(leader response must be updated by current term) and run
`RaftGroupService#changePeersAndLearnersAsync(leaderTerm, peers)`.
`RaftGroupService#changePeersAndLearnersAsync` from old terms must be skipped.
**Result**:
- New needed raft nodes started
- Change peers state initiated for every raft group
-## When RaftGroupService#changePeersAsync done inside the raft group – update
assignments, stable key and stop all redundant nodes
+## When RaftGroupService#changePeersAndLearnersAsync done inside the raft
group – update assignments, stable key and stop all redundant nodes
**Trigger**: When leader applied new Configuration with list of resulting
peers `<applied peer>`, it calls
`RebalanceRaftGroupEventsListener.onNewPeersConfigurationApplied(<applied
peers>)`
**Pseudocode**:
@@ -94,8 +94,8 @@ After stable key is updated, corresponding listener for that
change is called, s
Failover helpers
- `RebalanceRaftGroupEventsListener.onLeaderElected` - must be executed from
the new leader when raft group elected the new leader. Maybe we actually need
to also check if a new lease is received.
-- `RebalanceRaftGroupEventsListener.onReconfigurationError` - must be executed
when any errors during `RaftGroupService#changePeersAsync` occurred. For more
info about change peers process, and more specifically, catch up process, see
`modules/raft/tech-notes/changePeers.md` and
`modules/raft/tech-notes/nodeCatchUp.md`
-- `RebalanceRaftGroupEventsListener.onNewPeersConfigurationApplied(peers)` -
must be executed with the list of new peers when
`RaftGroupService#changePeersAsync` has successfully done.
+- `RebalanceRaftGroupEventsListener.onReconfigurationError` - must be executed
when any errors during `RaftGroupService#changePeersAndLearnersAsync` occurred.
For more info about change peers process, and more specifically, catch up
process, see `modules/raft/tech-notes/changePeersAndLearners.md` and
`modules/raft/tech-notes/nodeCatchUp.md`
+- `RebalanceRaftGroupEventsListener.onNewPeersConfigurationApplied(peers)` -
must be executed with the list of new peers when
`RaftGroupService#changePeersAndLearnersAsync` has successfully done.
## Cleanup redundant raft nodes (3)
**Trigger**: Node receive update about partition stable assignments
@@ -110,14 +110,14 @@ Failover helpers
# Failover
We need to provide Failover thread, which can handle the following cases:
-- `RaftGroupService#changePeersAsync` can't start even catchup process,
because of any new raft nodes wasn't started yet for instance.
-- `RaftGroupService#changePeersAsync` failed to complete catchup due to
catchup timeout, for example. To check all possible error cases during catch up
stage, check `modules/raft/tech-notes/nodeCatchUp.md`
+- `RaftGroupService#changePeersAndLearnersAsync` can't start even catchup
process, because of any new raft nodes wasn't started yet for instance.
+- `RaftGroupService#changePeersAndLearnersAsync` failed to complete catchup
due to catchup timeout, for example. To check all possible error cases during
catch up stage, check `modules/raft/tech-notes/nodeCatchUp.md`
We have the following mechanisms for handling these cases:
- `RebalanceRaftGroupEventsListener.onReconfigurationError`, which schedules
retries, if needed
- Separate special thread pool processes all needed retries on the current node
-- If a current node is not the leader of partition raft group anymore - it
will request `RaftGroupService#changePeersAsync` with legacy term, receive
appropriate answer from the leader and stop retries for this partition.
-- If a leader has been changed, new node receives
`RebalanceRaftGroupEventsListener.onLeaderElected` invoke and start needed
`RaftGroupService#changePeersAsync` from the pending key.
+- If a current node is not the leader of partition raft group anymore - it
will request `RaftGroupService#changePeersAndLearnersAsync` with legacy term,
receive appropriate answer from the leader and stop retries for this partition.
+- If a leader has been changed, new node receives
`RebalanceRaftGroupEventsListener.onLeaderElected` invoke and start needed
`RaftGroupService#changePeersAndLearnersAsync` from the pending key.
- If failover exhaust maximum number for query retries - it must notify node's
failure handler about the issue global (details must be specified later)
@@ -132,7 +132,7 @@ Also, failover mechanism above doesn't use metastore, but
raft term and special
## Adjustable `changePeers`
Algorithm above seems working well, but it has one serious caveat. When the
leader is busy by current `changePeers`, we can't start new one.
That's a big issue - because data rebalance process can be long enough, while
all nodes sync raft logs with data. According to
-https://github.com/apache/ignite-3/blob/main/modules/raft/tech-notes/changePeers.md
- we can relatively painless update the peers' list, if leader is in the
STAGE_CATCHING_UP phase still. Alternatively, we can cancel current
`changePeers`, if it is in the STAGE_CATCHING_UP and run new one
+https://github.com/apache/ignite-3/blob/main/modules/raft/tech-notes/changePeersAndLearners.md
- we can relatively painless update the peers' list, if leader is in the
STAGE_CATCHING_UP phase still. Alternatively, we can cancel current
`changePeers`, if it is in the STAGE_CATCHING_UP and run new one
### Approach 1. Update the peers' list of current `changePeers`
This approach can be addressed with different implemetation details, but let's
describe the simplest one.
@@ -183,9 +183,9 @@ If the listener return false - we should to await the new
peer list, process it
Instead of updating current `changePeers` with new peers' list - we can cancel
it and start the new one.
For this dish we will need:
-- New raft service's method `cancelChangePeers()`. This method should cancel
current `changePeers` if and only if it is in the STAGE_CATCHING_UP phase.
Method must return:
- - true: if no changePeers to cancel or successful cancel occurred.
+- New raft service's method `cancelChangePeersAndLearners()`. This method
should cancel current `changePeers` if and only if it is in the
STAGE_CATCHING_UP phase. Method must return:
+ - true: if no changePeersAndLearners to cancel or successful cancel occurred.
- false: if `changePeers` in progress and can't be cancelled (like in
approach 1 - if the leader is not in STAGE_CATCHING_UP/STAGE_NONE)
- Listen the `partition.assignments.planned` key and on update:
- - Execute `cancelChangePeers()` on the node with the partition leader. If it
returns `false` - do nothing.
+ - Execute `cancelChangePeersAndLearners()` on the node with the partition
leader. If it returns `false` - do nothing.
- If it returns `true` - move planned peers to pending in metastore