This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch ratis_create_peer
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ratis_create_peer by this push:
new 0e037e9c26 [IOTDB-5008] Fix SHUTDOWN errors in ratis create peer
(#8076)
0e037e9c26 is described below
commit 0e037e9c26aa38c0710950894a8c7a489e12ee3d
Author: William Song <[email protected]>
AuthorDate: Tue Nov 22 14:14:19 2022 +0800
[IOTDB-5008] Fix SHUTDOWN errors in ratis create peer (#8076)
---
.../iotdb/consensus/ratis/RatisConsensus.java | 43 +++-------------------
.../iotdb/consensus/ratis/RatisConsensusTest.java | 23 ++----------
2 files changed, 9 insertions(+), 57 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index ccefcb684e..5e9f60c6f9 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -329,11 +329,6 @@ class RatisConsensus implements IConsensus {
@Override
public ConsensusGenericResponse createPeer(ConsensusGroupId groupId,
List<Peer> peers) {
RaftGroup group = buildRaftGroup(groupId, peers);
- // pre-conditions: myself in this new group
- if (!group.getPeers().contains(myself)) {
- return failed(new ConsensusGroupNotExistException(groupId));
- }
-
// add RaftPeer myself to this RaftGroup
ConsensusGenericResponse reply = addNewGroupToServer(group, myself);
@@ -344,7 +339,11 @@ class RatisConsensus implements IConsensus {
RaftClientReply reply;
RatisClient client = null;
try {
- client = getRaftClient(group);
+ if (group.getPeers().size() == 0) {
+ client = getRaftClient(RaftGroup.valueOf(group.getGroupId(), server));
+ } else {
+ client = getRaftClient(group);
+ }
reply =
client.getRaftClient().getGroupManagementApi(server.getId()).add(group);
if (!reply.isSuccess()) {
return failed(new RatisRequestFailedException(reply.getException()));
@@ -488,38 +487,6 @@ class RatisConsensus implements IConsensus {
return
ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
}
- @Override
- public ConsensusGenericResponse addNewNodeToExistedGroup(
- ConsensusGroupId groupId, Peer newNode, List<Peer> originalGroup) {
-
- CompletableFuture<ConsensusGenericResponse> addResp =
- CompletableFuture.supplyAsync(() -> addPeer(groupId, newNode),
addExecutor);
-
- try {
- TimeUnit.MILLISECONDS.sleep(500);
- } catch (InterruptedException i) {
- logger.debug("{}: interrupted when wait to create new peer with
exception {}", this, i);
- Thread.currentThread().interrupt();
- }
-
- if (!originalGroup.contains(newNode)) {
- originalGroup.add(newNode);
- }
-
- RaftGroup group = buildRaftGroup(groupId, originalGroup);
- RaftPeer newPeer = Utils.fromPeerAndPriorityToRaftPeer(newNode,
DEFAULT_PRIORITY);
- ConsensusGenericResponse createResp = addNewGroupToServer(group, newPeer);
- if (!createResp.isSuccess()) {
- return createResp;
- }
-
- ConsensusGenericResponse addResult = addResp.join();
- if (!addResult.isSuccess()) {
- return addResult;
- }
- return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
- }
-
/**
* NOTICE: transferLeader *does not guarantee* the leader be transferred to
newLeader.
* transferLeader is implemented by 1. modify peer priority 2. ask current
leader to step down
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 877f867585..fd65d1ee71 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -42,6 +42,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -139,29 +140,16 @@ public class RatisConsensusTest {
doConsensus(servers.get(0), group.getGroupId(), 10, 10);
// add 2 members
- servers.get(1).createPeer(group.getGroupId(), peers);
+ servers.get(1).createPeer(group.getGroupId(), Collections.emptyList());
servers.get(0).addPeer(group.getGroupId(), peers.get(1));
- servers.get(2).createPeer(group.getGroupId(), peers);
+ servers.get(2).createPeer(group.getGroupId(), Collections.emptyList());
servers.get(0).changePeer(group.getGroupId(), peers);
Assert.assertEquals(stateMachines.get(0).getConfiguration().size(), 3);
doConsensus(servers.get(0), group.getGroupId(), 10, 20);
}
- @Test
- public void createAndAddMemberToGroup() throws Exception {
- List<Peer> original = peers.subList(0, 1);
- servers.get(0).createPeer(gid, original);
- doConsensus(servers.get(0), gid, 10, 10);
-
- ConsensusGenericResponse resp =
- servers.get(0).addNewNodeToExistedGroup(gid, peers.get(1), original);
- Assert.assertTrue(resp.isSuccess());
-
- doConsensus(servers.get(0), gid, 10, 20);
- }
-
@Test
public void removeMemberFromGroup() throws Exception {
servers.get(0).createPeer(group.getGroupId(), group.getPeers());
@@ -193,7 +181,7 @@ public class RatisConsensusTest {
servers.get(0).createPeer(group.getGroupId(), peers.subList(0, 1));
doConsensus(servers.get(0), group.getGroupId(), 10, 10);
- servers.get(1).createPeer(group.getGroupId(), peers.subList(0, 2));
+ servers.get(1).createPeer(group.getGroupId(), Collections.emptyList());
servers.get(0).addPeer(group.getGroupId(), peers.get(1));
servers.get(1).transferLeader(group.getGroupId(), peers.get(1));
servers.get(previousRemove ? 0 : 1).removePeer(group.getGroupId(),
peers.get(0));
@@ -215,9 +203,6 @@ public class RatisConsensusTest {
servers.clear();
makeServers();
- servers.get(0).createPeer(group.getGroupId(), group.getPeers());
- servers.get(1).createPeer(group.getGroupId(), group.getPeers());
- servers.get(2).createPeer(group.getGroupId(), group.getPeers());
doConsensus(servers.get(0), gid, 10, 210);
}