This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a8075e3 RATIS-1095. Move the related methods from RaftClientImpl to
the corresponding impls. (#222)
a8075e3 is described below
commit a8075e3d6ae8610ce5e0c731cdca303f73d25839
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Oct 13 18:56:08 2020 +0800
RATIS-1095. Move the related methods from RaftClientImpl to the
corresponding impls. (#222)
* RATIS-1095. Move the related methods from RaftClientImpl to the
corresponding impls.
* Revise the javadoc in BlockingApi.
* Revised the javadoc again.
* Fixed checkstyle.
---
.../java/org/apache/ratis/client/api/AsyncApi.java | 33 ++++++-
.../org/apache/ratis/client/api/BlockingApi.java | 33 ++++++-
.../org/apache/ratis/client/impl/AsyncImpl.java | 11 ++-
.../org/apache/ratis/client/impl/BlockingImpl.java | 91 ++++++++++++++++-
.../ratis/client/impl/GroupManagementImpl.java | 8 +-
.../ratis/client/impl/MessageStreamImpl.java | 10 +-
.../apache/ratis/client/impl/RaftClientImpl.java | 109 ++++-----------------
7 files changed, 181 insertions(+), 114 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index de8320e..fd000ad 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -24,11 +24,14 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
/**
- * APIs to support asynchronous operations such as send message, send
(stale)read message and watch request.
+ * Asynchronous API to support operations
+ * such as sending message, read-message, stale-read-message and watch-request.
+ *
+ * Note that this API and {@link BlockingApi} support the same set of
operations.
*/
public interface AsyncApi {
/**
- * Async call to send the given message to the raft service.
+ * Send the given message asynchronously to the raft service.
* The message may change the state of the service.
* For readonly messages, use {@link #sendReadOnly(Message)} instead.
*
@@ -37,12 +40,32 @@ public interface AsyncApi {
*/
CompletableFuture<RaftClientReply> send(Message message);
- /** Async call to send the given readonly message to the raft service. */
+ /**
+ * Send the given readonly message asynchronously to the raft service.
+ *
+ * @param message The request message.
+ * @return a future of the reply.
+ */
CompletableFuture<RaftClientReply> sendReadOnly(Message message);
- /** Async call to send the given stale-read message to the given server (not
the raft service). */
+ /**
+ * Send the given stale-read message asynchronously to the given server (not
the raft service).
+ * If the server commit index is larger than or equal to the given
min-index, the request will be processed.
+ * Otherwise, the server returns a {@link
org.apache.ratis.protocol.exceptions.StaleReadException}.
+ *
+ * @param message The request message.
+ * @param minIndex The minimum log index that the server log must have
already committed.
+ * @param server The target server
+ * @return a future of the reply.
+ */
CompletableFuture<RaftClientReply> sendStaleRead(Message message, long
minIndex, RaftPeerId server);
- /** Async call to watch the given index to satisfy the given replication
level. */
+ /**
+ * Watch the given index asynchronously to satisfy the given replication
level.
+ *
+ * @param index The log index to be watched.
+ * @param replication The replication level required.
+ * @return a future of the reply.
+ */
CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel
replication);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index dc14cd5..e1679b9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -24,7 +24,10 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
/**
- * APIs to support blocking operations such as send message, send (stale)read
message and watch request.
+ * Blocking API to support operations
+ * such as sending message, read-message, stale-read-message and watch-request.
+ *
+ * Note that this API and {@link AsyncApi} support the same set of operations.
*/
public interface BlockingApi {
/**
@@ -37,12 +40,32 @@ public interface BlockingApi {
*/
RaftClientReply send(Message message) throws IOException;
- /** Send the given readonly message to the raft service. */
+ /**
+ * Send the given readonly message to the raft service.
+ *
+ * @param message The request message.
+ * @return the reply.
+ */
RaftClientReply sendReadOnly(Message message) throws IOException;
- /** Send the given stale-read message to the given server (not the raft
service). */
+ /**
+ * Send the given stale-read message to the given server (not the raft
service).
+ * If the server commit index is larger than or equal to the given
min-index, the request will be processed.
+ * Otherwise, the server throws a {@link
org.apache.ratis.protocol.exceptions.StaleReadException}.
+ *
+ * @param message The request message.
+ * @param minIndex The minimum log index that the server log must have
already committed.
+ * @param server The target server
+ * @return the reply.
+ */
RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId
server) throws IOException;
- /** Watch the given index to satisfy the given replication level. */
+ /**
+ * Watch the given index to satisfy the given replication level.
+ *
+ * @param index The log index to be watched.
+ * @param replication The replication level required.
+ * @return the reply.
+ */
RaftClientReply watch(long index, ReplicationLevel replication) throws
IOException;
-}
+}
\ No newline at end of file
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index 09c3ae5..8537d35 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -34,19 +34,24 @@ class AsyncImpl implements AsyncApi {
this.client = Objects.requireNonNull(client, "client == null");
}
+ CompletableFuture<RaftClientReply> send(
+ RaftClientRequest.Type type, Message message, RaftPeerId server) {
+ return client.getOrderedAsync().send(type, message, server);
+ }
+
@Override
public CompletableFuture<RaftClientReply> send(Message message) {
- return client.sendAsync(RaftClientRequest.writeRequestType(), message,
null);
+ return send(RaftClientRequest.writeRequestType(), message, null);
}
@Override
public CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
- return client.sendAsync(RaftClientRequest.readRequestType(), message,
null);
+ return send(RaftClientRequest.readRequestType(), message, null);
}
@Override
public CompletableFuture<RaftClientReply> sendStaleRead(Message message,
long minIndex, RaftPeerId server) {
- return client.sendAsync(RaftClientRequest.staleReadRequestType(minIndex),
message, server);
+ return send(RaftClientRequest.staleReadRequestType(minIndex), message,
server);
}
@Override
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index d73c227..4b85b11 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,16 +18,30 @@
package org.apache.ratis.client.impl;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.Objects;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
import org.apache.ratis.client.api.BlockingApi;
+import org.apache.ratis.client.retry.ClientRetryEvent;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Blocking api implementations. */
class BlockingImpl implements BlockingApi {
+ static final Logger LOG = LoggerFactory.getLogger(BlockingImpl.class);
+
private final RaftClientImpl client;
BlockingImpl(RaftClientImpl client) {
@@ -36,22 +50,89 @@ class BlockingImpl implements BlockingApi {
@Override
public RaftClientReply send(Message message) throws IOException {
- return client.send(RaftClientRequest.writeRequestType(), message, null);
+ return send(RaftClientRequest.writeRequestType(), message, null);
}
@Override
public RaftClientReply sendReadOnly(Message message) throws IOException {
- return client.send(RaftClientRequest.readRequestType(), message, null);
+ return send(RaftClientRequest.readRequestType(), message, null);
}
@Override
public RaftClientReply sendStaleRead(Message message, long minIndex,
RaftPeerId server)
throws IOException {
- return client.send(RaftClientRequest.staleReadRequestType(minIndex),
message, server);
+ return send(RaftClientRequest.staleReadRequestType(minIndex), message,
server);
}
@Override
public RaftClientReply watch(long index, ReplicationLevel replication)
throws IOException {
- return client.send(RaftClientRequest.watchRequestType(index, replication),
null, null);
+ return send(RaftClientRequest.watchRequestType(index, replication), null,
null);
+ }
+
+ private RaftClientReply send(RaftClientRequest.Type type, Message message,
RaftPeerId server)
+ throws IOException {
+ if (!type.is(TypeCase.WATCH)) {
+ Objects.requireNonNull(message, "message == null");
+ }
+
+ final long callId = RaftClientImpl.nextCallId();
+ return sendRequestWithRetry(() -> client.newRaftClientRequest(server,
callId, message, type, null));
+ }
+
+ RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest> supplier)
throws IOException {
+ RaftClientImpl.PendingClientRequest pending = new
RaftClientImpl.PendingClientRequest() {
+ @Override
+ public RaftClientRequest newRequestImpl() {
+ return supplier.get();
+ }
+ };
+ while (true) {
+ final RaftClientRequest request = pending.newRequest();
+ IOException ioe = null;
+ try {
+ final RaftClientReply reply = sendRequest(request);
+
+ if (reply != null) {
+ return reply;
+ }
+ } catch (GroupMismatchException | StateMachineException e) {
+ throw e;
+ } catch (IOException e) {
+ ioe = e;
+ }
+
+ pending.incrementExceptionCount(ioe);
+ ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
+ final RetryPolicy retryPolicy = client.getRetryPolicy();
+ final RetryPolicy.Action action =
retryPolicy.handleAttemptFailure(event);
+ TimeDuration sleepTime = client.getEffectiveSleepTime(ioe,
action.getSleepTime());
+
+ if (!action.shouldRetry()) {
+ throw (IOException)client.noMoreRetries(event);
+ }
+
+ try {
+ sleepTime.sleep();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("retry policy=" + retryPolicy);
+ }
+ }
+ }
+
+ RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
+ LOG.debug("{}: send {}", client.getId(), request);
+ RaftClientReply reply;
+ try {
+ reply = client.getClientRpc().sendRequest(request);
+ } catch (GroupMismatchException gme) {
+ throw gme;
+ } catch (IOException ioe) {
+ client.handleIOException(request, ioe);
+ throw ioe;
+ }
+ LOG.debug("{}: receive {}", client.getId(), reply);
+ reply = client.handleLeaderException(request, reply);
+ reply = RaftClientImpl.handleRaftException(reply, Function.identity());
+ return reply;
}
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index 7ed407e..fab45b3 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -47,7 +47,7 @@ class GroupManagementImpl implements GroupManagementApi {
final long callId = RaftClientImpl.nextCallId();
client.addServers(newGroup.getPeers().stream());
- return client.sendRequest(GroupManagementRequest.newAdd(client.getId(),
server, callId, newGroup));
+ return
client.io().sendRequest(GroupManagementRequest.newAdd(client.getId(), server,
callId, newGroup));
}
@Override
@@ -56,14 +56,14 @@ class GroupManagementImpl implements GroupManagementApi {
Objects.requireNonNull(groupId, "groupId == null");
final long callId = RaftClientImpl.nextCallId();
- return client.sendRequest(GroupManagementRequest.newRemove(client.getId(),
server,
+ return
client.io().sendRequest(GroupManagementRequest.newRemove(client.getId(), server,
callId, groupId, deleteDirectory, renameDirectory));
}
@Override
public GroupListReply list() throws IOException {
final long callId = RaftClientImpl.nextCallId();
- final RaftClientReply reply = client.sendRequest(
+ final RaftClientReply reply = client.io().sendRequest(
new GroupListRequest(client.getId(), server, client.getGroupId(),
callId));
Preconditions.assertTrue(reply instanceof GroupListReply, () ->
"Unexpected reply: " + reply);
return (GroupListReply)reply;
@@ -75,7 +75,7 @@ class GroupManagementImpl implements GroupManagementApi {
groupId = client.getGroupId();
}
final long callId = RaftClientImpl.nextCallId();
- final RaftClientReply reply = client.sendRequest(
+ final RaftClientReply reply = client.io().sendRequest(
new GroupInfoRequest(client.getId(), server, groupId, callId));
Preconditions.assertTrue(reply instanceof GroupInfoReply, () ->
"Unexpected reply: " + reply);
return (GroupInfoReply)reply;
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
index 77a8932..e2971d6 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
@@ -23,6 +23,8 @@ import org.apache.ratis.client.api.MessageStreamApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftClientRequest.Type;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
@@ -48,14 +50,18 @@ public final class MessageStreamImpl implements
MessageStreamApi {
this.id = id;
}
+ private Type getStreamRequestType(boolean endOfRequest) {
+ return RaftClientRequest.streamRequestType(id,
messageId.getAndIncrement(), endOfRequest);
+ }
+
@Override
public CompletableFuture<RaftClientReply> sendAsync(Message message,
boolean endOfRequest) {
- return client.streamAsync(id, messageId.getAndIncrement(), message,
endOfRequest);
+ return client.async().send(getStreamRequestType(endOfRequest), message,
null);
}
@Override
public CompletableFuture<RaftClientReply> closeAsync() {
- return client.streamCloseAsync(id, messageId.getAndIncrement());
+ return client.async().send(getStreamRequestType(true), null, null);
}
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9d096ce..7105657 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,23 +17,27 @@
*/
package org.apache.ratis.client.impl;
-import org.apache.ratis.client.api.AsyncApi;
-import org.apache.ratis.client.api.GroupManagementApi;
-import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.client.api.GroupManagementApi;
import org.apache.ratis.client.api.MessageStreamApi;
+import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
-import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
@@ -42,7 +46,6 @@ import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
@@ -116,6 +119,8 @@ public final class RaftClientImpl implements RaftClient {
private final Supplier<OrderedAsync> orderedAsync;
private final Supplier<MessageStreamApi> streamApi;
+ private final Supplier<AsyncImpl> asyncApi;
+ private final Supplier<BlockingImpl> blockingApi;
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy
retryPolicy) {
@@ -132,6 +137,8 @@ public final class RaftClientImpl implements RaftClient {
this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this,
properties));
this.streamApi = JavaUtils.memoize(() ->
MessageStreamImpl.newInstance(this, properties));
+ this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
+ this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
}
public RaftPeerId getLeaderId() {
@@ -186,19 +193,6 @@ public final class RaftClientImpl implements RaftClient {
return streamApi.get();
}
- CompletableFuture<RaftClientReply> streamAsync(long streamId, long
messageId, Message message, boolean endOfRequest) {
- return sendAsync(RaftClientRequest.streamRequestType(streamId, messageId,
endOfRequest), message, null);
- }
-
- CompletableFuture<RaftClientReply> streamCloseAsync(long streamId, long
messageId) {
- return sendAsync(RaftClientRequest.streamRequestType(streamId, messageId,
true), null, null);
- }
-
- CompletableFuture<RaftClientReply> sendAsync(
- RaftClientRequest.Type type, Message message, RaftPeerId server) {
- return getOrderedAsync().send(type, message, server);
- }
-
RaftClientRequest newRaftClientRequest(
RaftPeerId server, long callId, Message message, RaftClientRequest.Type
type,
SlidingWindowEntry slidingWindowEntry) {
@@ -206,15 +200,6 @@ public final class RaftClientImpl implements RaftClient {
callId, message, type, slidingWindowEntry);
}
- RaftClientReply send(RaftClientRequest.Type type, Message message,
RaftPeerId server)
- throws IOException {
- if (!type.is(TypeCase.WATCH)) {
- Objects.requireNonNull(message, "message == null");
- }
-
- final long callId = nextCallId();
- return sendRequestWithRetry(() -> newRaftClientRequest(server, callId,
message, type, null));
- }
// TODO: change peersInNewConf to List<RaftPeer>
@Override
@@ -225,13 +210,13 @@ public final class RaftClientImpl implements RaftClient {
final long callId = nextCallId();
// also refresh the rpc proxies for these peers
addServers(Arrays.stream(peersInNewConf));
- return sendRequestWithRetry(() -> new SetConfigurationRequest(
+ return io().sendRequestWithRetry(() -> new SetConfigurationRequest(
clientId, leaderId, groupId, callId, Arrays.asList(peersInNewConf)));
}
@Override
- public AsyncApi async() {
- return new AsyncImpl(this);
+ public AsyncImpl async() {
+ return asyncApi.get();
}
@Override
@@ -241,7 +226,7 @@ public final class RaftClientImpl implements RaftClient {
@Override
public BlockingImpl io() {
- return new BlockingImpl(this);
+ return blockingApi.get();
}
void addServers(Stream<RaftPeer> peersInNewConf) {
@@ -249,45 +234,6 @@ public final class RaftClientImpl implements RaftClient {
peersInNewConf.filter(p -> !peers.contains(p))::iterator);
}
- private RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest>
supplier) throws IOException {
- PendingClientRequest pending = new PendingClientRequest() {
- @Override
- public RaftClientRequest newRequestImpl() {
- return supplier.get();
- }
- };
- while (true) {
- final RaftClientRequest request = pending.newRequest();
- IOException ioe = null;
- try {
- final RaftClientReply reply = sendRequest(request);
-
- if (reply != null) {
- return reply;
- }
- } catch (GroupMismatchException | StateMachineException e) {
- throw e;
- } catch (IOException e) {
- ioe = e;
- }
-
- pending.incrementExceptionCount(ioe);
- ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
- final RetryPolicy.Action action =
retryPolicy.handleAttemptFailure(event);
- TimeDuration sleepTime = getEffectiveSleepTime(ioe,
action.getSleepTime());
-
- if (!action.shouldRetry()) {
- throw (IOException)noMoreRetries(event);
- }
-
- try {
- sleepTime.sleep();
- } catch (InterruptedException e) {
- throw new InterruptedIOException("retry policy=" + retryPolicy);
- }
- }
- }
-
Throwable noMoreRetries(ClientRetryEvent event) {
final int attemptCount = event.getAttemptCount();
final Throwable throwable = event.getCause();
@@ -297,23 +243,6 @@ public final class RaftClientImpl implements RaftClient {
return new RaftRetryFailureException(event.getRequest(), attemptCount,
retryPolicy, throwable);
}
- RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
- LOG.debug("{}: send {}", clientId, request);
- RaftClientReply reply;
- try {
- reply = clientRpc.sendRequest(request);
- } catch (GroupMismatchException gme) {
- throw gme;
- } catch (IOException ioe) {
- handleIOException(request, ioe);
- throw ioe;
- }
- LOG.debug("{}: receive {}", clientId, reply);
- reply = handleLeaderException(request, reply);
- reply = handleRaftException(reply, Function.identity());
- return reply;
- }
-
static <E extends Throwable> RaftClientReply handleRaftException(
RaftClientReply reply, Function<RaftException, E> converter) throws E {
if (reply != null) {