This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch issue_3090 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c3da6cc90b468268d07acf425ee1350a6b4fadef Author: LebronAl <[email protected]> AuthorDate: Tue May 3 18:11:29 2022 +0800 fix --- .../iotdb/consensus/ratis/RatisConsensus.java | 75 ++++++++++++++++------ 1 file changed, 54 insertions(+), 21 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index f8d65fa0b0..96b3b766e3 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -94,7 +94,7 @@ class RatisConsensus implements IConsensus { new IClientManager.Factory<RaftGroup, RatisClient>() .createClientManager(new RatisClientPoolFactory()); - private Map<RaftGroupId, RaftGroup> lastSeen; + private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>(); private final ClientId localFakeId = ClientId.randomId(); private final AtomicLong localFakeCallId = new AtomicLong(0); @@ -108,9 +108,6 @@ class RatisConsensus implements IConsensus { */ public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry) throws IOException { - lastSeen = new ConcurrentHashMap<>(); - - // create a RaftPeer as endpoint of comm String address = Utils.IPAddress(endpoint); myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY); @@ -174,7 +171,6 @@ class RatisConsensus implements IConsensus { TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder(); return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build(); } - NotLeaderException ex = localServerReply.getNotLeaderException(); if (ex != null) { // local server is not leader suggestedLeader = ex.getSuggestedLeader(); @@ -185,13 +181,20 @@ class RatisConsensus implements IConsensus { // 2. try raft client TSStatus writeResult; + RatisClient client = null; try { - RatisClient client = getRaftClient(raftGroup); + client = getRaftClient(raftGroup); RaftClientReply reply = client.getRaftClient().io().send(message); + if (!reply.isSuccess()) { + return failedWrite(new RatisRequestFailedException(reply.getException())); + } writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer()); - client.returnSelf(); } catch (IOException | TException e) { return failedWrite(new RatisRequestFailedException(e)); + } finally { + if (client != null) { + client.returnSelf(); + } } if (suggestedLeader != null) { @@ -214,11 +217,12 @@ class RatisConsensus implements IConsensus { RaftClientReply reply; try { RequestMessage message = new RequestMessage(IConsensusRequest); - RaftClientRequest clientRequest = - buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(0)); - + buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(-1)); reply = server.submitClientRequest(clientRequest); + if (!reply.isSuccess()) { + return failedRead(new RatisRequestFailedException(reply.getException())); + } } catch (IOException e) { return failedRead(new RatisRequestFailedException(e)); } @@ -247,12 +251,19 @@ class RatisConsensus implements IConsensus { // add RaftPeer myself to this RaftGroup RaftClientReply reply; + RatisClient client = null; try { - RatisClient client = getRaftClient(group); + client = getRaftClient(group); reply = client.getRaftClient().getGroupManagementApi(myself.getId()).add(group); - client.returnSelf(); + if (!reply.isSuccess()) { + return failed(new RatisRequestFailedException(reply.getException())); + } } catch (IOException e) { return failed(new RatisRequestFailedException(e)); + } finally { + if (client != null) { + client.returnSelf(); + } } return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); @@ -277,16 +288,23 @@ class RatisConsensus implements IConsensus { // send remove group to myself RaftClientReply reply; + RatisClient client = null; try { - RatisClient client = getRaftClient(raftGroup); + client = getRaftClient(raftGroup); reply = client .getRaftClient() .getGroupManagementApi(myself.getId()) .remove(raftGroupId, false, false); - client.returnSelf(); + if (!reply.isSuccess()) { + return failed(new RatisRequestFailedException(reply.getException())); + } } catch (IOException e) { return failed(new RatisRequestFailedException(e)); + } finally { + if (client != null) { + client.returnSelf(); + } } return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); @@ -414,19 +432,27 @@ class RatisConsensus implements IConsensus { } } - RaftClientReply reply = null; + RaftClientReply reply; + RatisClient client = null; try { - RatisClient client = getRaftClient(raftGroup); + client = getRaftClient(raftGroup); RaftClientReply configChangeReply = client.getRaftClient().admin().setConfiguration(newConfiguration); if (!configChangeReply.isSuccess()) { return failed(new RatisRequestFailedException(configChangeReply.getException())); } // TODO tuning for timeoutMs - reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 2000); - client.returnSelf(); + reply = client.getRaftClient().admin().transferLeadership(newRaftLeader.getId(), 5000); + if (!reply.isSuccess()) { + return failed(new RatisRequestFailedException(reply.getException())); + } + } catch (IOException e) { return failed(new RatisRequestFailedException(e)); + } finally { + if (client != null) { + client.returnSelf(); + } } return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); } @@ -453,7 +479,7 @@ class RatisConsensus implements IConsensus { } RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId); - RaftClient client = null; + RaftClient client; try { client = server.getDivision(raftGroupId).getRaftClient(); } catch (IOException e) { @@ -531,13 +557,20 @@ class RatisConsensus implements IConsensus { throws RatisRequestFailedException { // notify the group leader of configuration change RaftClientReply reply; + RatisClient client = null; try { - RatisClient client = getRaftClient(newGroupConf); + client = getRaftClient(newGroupConf); reply = client.getRaftClient().admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers())); - client.returnSelf(); + if (!reply.isSuccess()) { + throw new RatisRequestFailedException(reply.getException()); + } } catch (IOException e) { throw new RatisRequestFailedException(e); + } finally { + if (client != null) { + client.returnSelf(); + } } return reply; }
