This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira3167 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6a4febe4c58d06a789ddf2a37fcf870fd69c8cec Author: LebronAl <[email protected]> AuthorDate: Wed May 11 20:23:59 2022 +0800 finish --- confignode/src/assembly/resources/conf/logback.xml | 2 +- .../iotdb/consensus/ratis/RatisConsensus.java | 92 ++++++++++++++++------ .../commons/client/ClientFactoryProperty.java | 2 +- 3 files changed, 70 insertions(+), 26 deletions(-) diff --git a/confignode/src/assembly/resources/conf/logback.xml b/confignode/src/assembly/resources/conf/logback.xml index e5181c56dd..9c538adb4d 100644 --- a/confignode/src/assembly/resources/conf/logback.xml +++ b/confignode/src/assembly/resources/conf/logback.xml @@ -136,5 +136,5 @@ <appender-ref ref="stdout"/> </root> <logger level="info" name="org.apache.iotdb.confignode"/> - <logger level="warn" name="org.apache.ratis"/> + <logger level="info" name="org.apache.ratis"/> </configuration> diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 8d644bbc81..f714b2bf65 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -58,6 +58,7 @@ import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.exceptions.NotLeaderException; +import org.apache.ratis.server.DivisionInfo; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.NetUtils; @@ -72,6 +73,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -103,6 +105,9 @@ class RatisConsensus implements IConsensus { private static final int DEFAULT_PRIORITY = 0; private static final int LEADER_PRIORITY = 1; + // TODO make it configurable + private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); + /** * @param ratisStorageDir different groups of RatisConsensus Peer all share ratisStorageDir as * root dir @@ -149,13 +154,13 @@ class RatisConsensus implements IConsensus { */ @Override public ConsensusWriteResponse write( - ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) { + ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) { // pre-condition: group exists and myself server serves this group - RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); + RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId); RaftGroup raftGroup = getGroupInfo(raftGroupId); if (raftGroup == null || !raftGroup.getPeers().contains(myself)) { - return failedWrite(new ConsensusGroupNotExistException(groupId)); + return failedWrite(new ConsensusGroupNotExistException(consensusGroupId)); } // serialize request into Message @@ -163,22 +168,24 @@ class RatisConsensus implements IConsensus { // 1. first try the local server RaftClientRequest clientRequest = - buildRawRequest(groupId, message, RaftClientRequest.writeRequestType()); + buildRawRequest(raftGroupId, message, RaftClientRequest.writeRequestType()); RaftClientReply localServerReply; RaftPeer suggestedLeader = null; - try { - localServerReply = server.submitClientRequest(clientRequest); - if (localServerReply.isSuccess()) { - ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage(); - TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder(); - return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build(); + if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) { + try { + localServerReply = server.submitClientRequest(clientRequest); + if (localServerReply.isSuccess()) { + ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage(); + TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder(); + return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build(); + } + NotLeaderException ex = localServerReply.getNotLeaderException(); + if (ex != null) { // local server is not leader + suggestedLeader = ex.getSuggestedLeader(); + } + } catch (IOException e) { + return failedWrite(new RatisRequestFailedException(e)); } - NotLeaderException ex = localServerReply.getNotLeaderException(); - if (ex != null) { // local server is not leader - suggestedLeader = ex.getSuggestedLeader(); - } - } catch (IOException e) { - return failedWrite(new RatisRequestFailedException(e)); } // 2. try raft client @@ -209,18 +216,20 @@ class RatisConsensus implements IConsensus { /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */ @Override - public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) { - - RaftGroup group = getGroupInfo(Utils.fromConsensusGroupIdToRaftGroupId(groupId)); + public ConsensusReadResponse read( + ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) { + RaftGroupId groupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId); + RaftGroup group = getGroupInfo(groupId); if (group == null || !group.getPeers().contains(myself)) { - return failedRead(new ConsensusGroupNotExistException(groupId)); + return failedRead(new ConsensusGroupNotExistException(consensusGroupId)); } RaftClientReply reply; try { RequestMessage message = new RequestMessage(IConsensusRequest); RaftClientRequest clientRequest = - buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(-1)); + buildRawRequest( + groupId, message, RaftClientRequest.staleReadRequestType(getCommitIndex(groupId))); reply = server.submitClientRequest(clientRequest); if (!reply.isSuccess()) { return failedRead(new RatisRequestFailedException(reply.getException())); @@ -476,6 +485,32 @@ class RatisConsensus implements IConsensus { return isLeader; } + private boolean waitUntilLeaderReady(RaftGroupId groupId) { + DivisionInfo divisionInfo; + try { + divisionInfo = server.getDivision(groupId).getInfo(); + } catch (IOException e) { + // if the query fails, simply return not leader + logger.info("isLeaderReady checking failed with exception: ", e); + return false; + } + long startTime = System.currentTimeMillis(); + try { + while (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) { + Thread.sleep(100); + long consumedTime = System.currentTimeMillis() - startTime; + if (consumedTime >= DEFAULT_WAIT_LEADER_READY_TIMEOUT) { + logger.warn("{}: leader is still not ready after {}ms", groupId, consumedTime); + return false; + } + } + } catch (InterruptedException e) { + logger.warn("Unexpected interruption", e); + return false; + } + return divisionInfo.isLeader(); + } + @Override public Peer getLeader(ConsensusGroupId groupId) { if (isLeader(groupId)) { @@ -512,12 +547,12 @@ class RatisConsensus implements IConsensus { } private RaftClientRequest buildRawRequest( - ConsensusGroupId groupId, Message message, RaftClientRequest.Type type) { + RaftGroupId groupId, Message message, RaftClientRequest.Type type) { return RaftClientRequest.newBuilder() .setServerId(server.getId()) .setClientId(localFakeId) .setCallId(localFakeCallId.incrementAndGet()) - .setGroupId(Utils.fromConsensusGroupIdToRaftGroupId(groupId)) + .setGroupId(groupId) .setType(type) .setMessage(message) .build(); @@ -534,11 +569,20 @@ class RatisConsensus implements IConsensus { lastSeen.put(raftGroupId, raftGroup); } } catch (IOException e) { - logger.debug("get group failed ", e); + logger.debug("get group {} failed ", raftGroupId, e); } return raftGroup; } + private long getCommitIndex(RaftGroupId raftGroupId) { + try { + return server.getDivision(raftGroupId).getRaftLog().getLastCommittedIndex(); + } catch (IOException e) { + logger.debug("get group {} failed ", raftGroupId, e); + } + return -1; + } + private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) { return RaftGroup.valueOf( Utils.fromConsensusGroupIdToRaftGroupId(groupId), diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java index f1801628e4..2ff20065e8 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java @@ -89,7 +89,7 @@ public class ClientFactoryProperty { private DefaultProperty() {} public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false; - public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20);; + public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20); public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1; } }
