This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch fix_ratisclient_client_null_issue in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 13045d38e38a3fc705919f2a9f8e0014648c7717 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Fri Dec 27 22:48:22 2024 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../iotdb/consensus/ratis/RatisConsensus.java | 23 +++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 9ae8dcd6a26..384730a9742 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -275,7 +275,8 @@ class RatisConsensus implements IConsensus { } /** Launch a consensus write with retry mechanism */ - private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, IOException> caller) + private RaftClientReply writeWithRetry( + CheckedSupplier<RaftClientReply, IOException> caller, RaftGroupId groupId) throws IOException { RaftClientReply reply = null; try { @@ -287,6 +288,9 @@ class RatisConsensus implements IConsensus { if (reply == null) { return RaftClientReply.newBuilder() + .setClientId(ClientId.emptyClientId()) + .setServerId(server.get().getId()) + .setGroupId(groupId) .setSuccess(false) .setException( new RaftException("null reply received in writeWithRetry for request " + caller)) @@ -295,13 +299,14 @@ class RatisConsensus implements IConsensus { return reply; } - private RaftClientReply writeLocallyWithRetry(RaftClientRequest request) throws IOException { - return writeWithRetry(() -> server.get().submitClientRequest(request)); + private RaftClientReply writeLocallyWithRetry(RaftClientRequest request, RaftGroupId groupId) + throws IOException { + return writeWithRetry(() -> server.get().submitClientRequest(request), groupId); } - private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message message) - throws IOException { - return writeWithRetry(() -> client.getRaftClient().io().send(message)); + private RaftClientReply writeRemotelyWithRetry( + RatisClient client, Message message, RaftGroupId groupId) throws IOException { + return writeWithRetry(() -> client.getRaftClient().io().send(message), groupId); } /** @@ -323,7 +328,7 @@ class RatisConsensus implements IConsensus { try { forceStepDownLeader(raftGroup); } catch (Exception e) { - logger.warn("leader {} read only, force step down failed due to {}", myself, e); + logger.warn("leader {} read only, force step down failed due to, ", myself, e); } return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY); } @@ -344,7 +349,7 @@ class RatisConsensus implements IConsensus { && waitUntilLeaderReady(raftGroupId)) { try (AutoCloseable ignored = RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) { - RaftClientReply localServerReply = writeLocallyWithRetry(clientRequest); + RaftClientReply localServerReply = writeLocallyWithRetry(clientRequest, raftGroupId); if (localServerReply.isSuccess()) { ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage(); return (TSStatus) responseMessage.getContentHolder(); @@ -365,7 +370,7 @@ class RatisConsensus implements IConsensus { try (AutoCloseable ignored = RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType); RatisClient client = getRaftClient(raftGroup)) { - RaftClientReply reply = writeRemotelyWithRetry(client, message); + RaftClientReply reply = writeRemotelyWithRetry(client, message, raftGroupId); if (!reply.isSuccess()) { throw new RatisRequestFailedException(reply.getException()); }
