This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch CIBugTest in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 07c0dbf4a45de599b9709d127e8af60025f4e47d Author: JackieTien97 <[email protected]> AuthorDate: Thu Jun 9 20:58:41 2022 +0800 init --- .../org/apache/iotdb/consensus/config/RatisConfig.java | 4 ++-- .../org/apache/iotdb/consensus/ratis/RatisConsensus.java | 14 ++++++++++++++ integration-test/pom.xml | 2 +- .../iotdb/db/service/thrift/impl/InternalServiceImpl.java | 12 +++++++++--- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java index 048fd21d76..9e566fe3af 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java @@ -173,8 +173,8 @@ public class RatisConfig { } public static class Builder { - private TimeDuration timeoutMin = TimeDuration.valueOf(2, TimeUnit.SECONDS); - private TimeDuration timeoutMax = TimeDuration.valueOf(8, TimeUnit.SECONDS); + private TimeDuration timeoutMin = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + private TimeDuration timeoutMax = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS); private TimeDuration requestTimeout = TimeDuration.valueOf(20, TimeUnit.SECONDS); private TimeDuration sleepTime = TimeDuration.valueOf(1, TimeUnit.SECONDS); private TimeDuration slownessTimeout = TimeDuration.valueOf(10, TimeUnit.MINUTES); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index ff1a207403..22442199bb 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -168,8 +168,10 @@ class RatisConsensus implements IConsensus { buildRawRequest(raftGroupId, message, RaftClientRequest.writeRequestType()); RaftClientReply localServerReply; RaftPeer suggestedLeader = null; + logger.info("trying to judge whether current node is leader and whether it is ready"); if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) { try { + logger.info("submit locally!"); localServerReply = server.submitClientRequest(clientRequest); if (localServerReply.isSuccess()) { ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage(); @@ -185,17 +187,25 @@ class RatisConsensus implements IConsensus { } } + logger.info("submit using raft client!"); // 2. try raft client TSStatus writeResult; RatisClient client = null; try { + logger.info("trying to get raft client!"); client = getRaftClient(raftGroup); + logger.info("trying to send message using raft client"); RaftClientReply reply = client.getRaftClient().io().send(message); + logger.info("get result from raft client"); if (!reply.isSuccess()) { + logger.info("result is not success", reply.getException()); return failedWrite(new RatisRequestFailedException(reply.getException())); } + logger.info("result is success"); writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer()); + logger.info("status is {}", writeResult); } catch (IOException | TException e) { + logger.info("exception found!", e); return failedWrite(new RatisRequestFailedException(e)); } finally { if (client != null) { @@ -208,6 +218,7 @@ class RatisConsensus implements IConsensus { writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), leaderEndPoint.getPort())); } + logger.info("successfully return"); return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build(); } @@ -477,6 +488,7 @@ class RatisConsensus implements IConsensus { boolean isLeader; try { + logger.info("trying to judge whether current node is leader from server"); isLeader = server.getDivision(raftGroupId).getInfo().isLeader(); } catch (IOException exception) { // if the query fails, simply return not leader @@ -489,6 +501,7 @@ class RatisConsensus implements IConsensus { private boolean waitUntilLeaderReady(RaftGroupId groupId) { DivisionInfo divisionInfo; try { + logger.info("trying to get DivisionInfo from server"); divisionInfo = server.getDivision(groupId).getInfo(); } catch (IOException e) { // if the query fails, simply return not leader @@ -500,6 +513,7 @@ class RatisConsensus implements IConsensus { while (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) { Thread.sleep(10); long consumedTime = System.currentTimeMillis() - startTime; + logger.info("trying to get DivisionInfo from server in while"); if (consumedTime >= DEFAULT_WAIT_LEADER_READY_TIMEOUT) { logger.warn("{}: leader is still not ready after {}ms", groupId, consumedTime); return false; diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 51ac9c39cf..d80aa543ad 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -263,7 +263,7 @@ <useSystemClassLoader>false</useSystemClassLoader> <parallel>classes</parallel> <useUnlimitedThreads>true</useUnlimitedThreads> - <forkCount>8</forkCount> + <forkCount>3</forkCount> <reuseForks>false</reuseForks> <systemPropertyVariables> <TestEnv>Cluster1</TestEnv> diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java index ab75ed7da9..e0c5e7419a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java @@ -158,12 +158,18 @@ public class InternalServiceImpl implements InternalService.Iface { if (groupId instanceof DataRegionId) { writeResponse = DataRegionConsensusImpl.getInstance().write(groupId, fragmentInstance); } else { + LOGGER.info("call consensus layer API"); writeResponse = SchemaRegionConsensusImpl.getInstance().write(groupId, fragmentInstance); } // TODO need consider more status - response.setAccepted( - TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode()); - response.setMessage(writeResponse.getStatus().message); + if (writeResponse.getStatus() == null) { + LOGGER.error("error happened. ", writeResponse.getException()); + response.setAccepted(false); + } else { + response.setAccepted( + TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode()); + response.setMessage(writeResponse.getStatus().message); + } return response; } return null;
