Repository: incubator-ratis Updated Branches: refs/heads/master f1716ac43 -> a25143eed
RATIS-38. RaftClient should not retry on StateMachineException. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a25143ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a25143ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a25143ee Branch: refs/heads/master Commit: a25143eedb84a3da5ffba5d475314eca41a7ae48 Parents: f1716ac Author: Jing Zhao <[email protected]> Authored: Thu Mar 16 13:09:48 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Thu Mar 16 13:09:48 2017 -0700 ---------------------------------------------------------------------- .../ratis/client/impl/ClientProtoUtils.java | 69 ++++++++++++++++---- .../ratis/client/impl/RaftClientImpl.java | 22 +++++-- .../apache/ratis/protocol/RaftClientReply.java | 32 ++++++--- .../ratis/protocol/StateMachineException.java | 2 +- .../java/org/apache/ratis/util/RaftUtils.java | 11 ++++ .../java/org/apache/ratis/util/StringUtils.java | 10 +++ .../TestRaftStateMachineException.java | 2 +- .../org/apache/ratis/grpc/RaftGrpcUtil.java | 24 +------ .../TestRaftReconfigurationWithHadoopRpc.java | 9 +++ ratis-proto-shaded/src/main/proto/Raft.proto | 20 ++++-- .../apache/ratis/server/impl/LeaderState.java | 5 +- .../apache/ratis/server/impl/LogAppender.java | 2 +- .../ratis/server/impl/PendingRequests.java | 11 +++- .../ratis/server/impl/RaftServerImpl.java | 9 +-- .../ratis/server/impl/StateMachineUpdater.java | 4 +- 15 files changed, 164 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 218d761..ddecad6 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -21,9 +21,13 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.protocol.*; import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.RaftUtils; import java.util.Arrays; +import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION; +import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION; + public class ClientProtoUtils { public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder( byte[] requestorId, byte[] replyId, long callId, boolean success) { @@ -83,14 +87,25 @@ public class ClientProtoUtils { b.setMessage(toClientMessageEntryProto(reply.getMessage())); } if (reply.isNotLeader()) { - b.setIsNotLeader(true); - final RaftPeer suggestedLeader = reply.getNotLeaderException() - .getSuggestedLeader(); + NotLeaderException nle = reply.getNotLeaderException(); + NotLeaderExceptionProto.Builder nleBuilder = + NotLeaderExceptionProto.newBuilder(); + final RaftPeer suggestedLeader = nle.getSuggestedLeader(); if (suggestedLeader != null) { - b.setSuggestedLeader(ProtoUtils.toRaftPeerProto(suggestedLeader)); + nleBuilder.setSuggestedLeader(ProtoUtils.toRaftPeerProto(suggestedLeader)); } - b.addAllPeersInConf(ProtoUtils.toRaftPeerProtos( - Arrays.asList(reply.getNotLeaderException().getPeers()))); + nleBuilder.addAllPeersInConf( + ProtoUtils.toRaftPeerProtos(Arrays.asList(nle.getPeers()))); + b.setNotLeaderException(nleBuilder.build()); + } else if (reply.hasStateMachineException()) { + StateMachineException sme = reply.getStateMachineException(); + StateMachineExceptionProto.Builder smeBuilder = + StateMachineExceptionProto.newBuilder(); + final Throwable t = sme.getCause() != null ? sme.getCause() : sme; + smeBuilder.setExceptionClassName(t.getClass().getName()) + .setErrorMsg(t.getMessage()) + .setStacktrace(ProtoUtils.toByteString(t.getStackTrace())); + b.setStateMachineException(smeBuilder.build()); } } return b.build(); @@ -99,25 +114,53 @@ public class ClientProtoUtils { public static RaftClientReply toRaftClientReply( RaftClientReplyProto replyProto) { final RaftRpcReplyProto rp = replyProto.getRpcReply(); - NotLeaderException e = null; - if (replyProto.getIsNotLeader()) { - final RaftPeer suggestedLeader = replyProto.hasSuggestedLeader() ? - ProtoUtils.toRaftPeer(replyProto.getSuggestedLeader()) : null; + RaftException e = null; + if (replyProto.getExceptionDetailsCase().equals(NOTLEADEREXCEPTION)) { + NotLeaderExceptionProto nleProto = replyProto.getNotLeaderException(); + final RaftPeer suggestedLeader = nleProto.hasSuggestedLeader() ? + ProtoUtils.toRaftPeer(nleProto.getSuggestedLeader()) : null; final RaftPeer[] peers = ProtoUtils.toRaftPeerArray( - replyProto.getPeersInConfList()); + nleProto.getPeersInConfList()); e = new NotLeaderException(new RaftPeerId(rp.getReplyId()), suggestedLeader, peers); + } else if (replyProto.getExceptionDetailsCase().equals(STATEMACHINEEXCEPTION)) { + StateMachineExceptionProto smeProto = replyProto.getStateMachineException(); + e = wrapStateMachineException(rp.getReplyId().toStringUtf8(), + smeProto.getExceptionClassName(), smeProto.getErrorMsg(), + smeProto.getStacktrace()); } return new RaftClientReply(new ClientId(rp.getRequestorId().toByteArray()), new RaftPeerId(rp.getReplyId()), rp.getCallId(), rp.getSuccess(), toMessage(replyProto.getMessage()), e); } - public static Message toMessage(final ClientMessageEntryProto p) { + private static StateMachineException wrapStateMachineException( + String serverId, String className, String errorMsg, + ByteString stackTraceBytes) { + StateMachineException sme; + if (className == null) { + sme = new StateMachineException(errorMsg); + } else { + try { + Class<?> clazz = Class.forName(className); + final Exception e = RaftUtils.instantiateException( + clazz.asSubclass(Exception.class), errorMsg, null); + sme = new StateMachineException(serverId, e); + } catch (Exception e) { + sme = new StateMachineException(className + ": " + errorMsg); + } + } + StackTraceElement[] stacktrace = + (StackTraceElement[]) ProtoUtils.toObject(stackTraceBytes); + sme.setStackTrace(stacktrace); + return sme; + } + + private static Message toMessage(final ClientMessageEntryProto p) { return p::getContent; } - public static ClientMessageEntryProto toClientMessageEntryProto(Message message) { + private static ClientMessageEntryProto toClientMessageEntryProto(Message message) { return ClientMessageEntryProto.newBuilder() .setContent(message.getContent()).build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 4e5db47..eae42a5 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -25,10 +25,12 @@ import org.apache.ratis.util.RaftUtils; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.Collectors; /** A client who sends requests to a raft service. */ final class RaftClientImpl implements RaftClient { @@ -82,6 +84,9 @@ final class RaftClientImpl implements RaftClient { public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf) throws IOException { final long callId = nextCallId(); + // also refresh the rpc proxies for these peers + clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains) + .collect(Collectors.toCollection(ArrayList::new))); return sendRequestWithRetry(() -> new SetConfigurationRequest( clientId, leaderId, callId, peersInNewConf)); } @@ -111,19 +116,21 @@ final class RaftClientImpl implements RaftClient { private RaftClientReply sendRequest(RaftClientRequest request) throws StateMachineException { + RaftClientReply reply = null; try { - RaftClientReply reply = clientRpc.sendRequest(request); + reply = clientRpc.sendRequest(request); + } catch (IOException ioe) { + handleIOException(request, ioe, null); + } + if (reply != null) { if (reply.isNotLeader()) { handleNotLeaderException(request, reply.getNotLeaderException()); return null; + } else if (reply.hasStateMachineException()) { + throw reply.getStateMachineException(); } else { return reply; } - } catch (StateMachineException e) { - throw e; - } catch (IOException ioe) { - // TODO different retry policies for different exceptions - handleIOException(request, ioe, null); } return null; } @@ -147,7 +154,8 @@ final class RaftClientImpl implements RaftClient { private void handleIOException(RaftClientRequest request, IOException ioe, RaftPeerId newLeader) { - LOG.debug("{}: Failed with {}", clientId, ioe); + LOG.debug("{}: suggested new leader: {}. Failed with {}", clientId, + newLeader, ioe); final RaftPeerId oldLeader = request.getServerId(); if (newLeader == null && oldLeader.equals(leaderId)) { newLeader = RaftUtils.next(oldLeader, RaftUtils.as(peers, RaftPeer::getId)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 4dd2943..7179505 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -24,23 +24,27 @@ public class RaftClientReply extends RaftClientMessage { private final boolean success; private final long callId; - /** non-null if the server is not leader */ - private final NotLeaderException notLeaderException; + /** + * We mainly track two types of exceptions here: + * 1. NotLeaderException if the server is not leader + * 2. StateMachineException if the server's state machine returns an exception + */ + private final RaftException exception; private final Message message; public RaftClientReply(ClientId clientId, RaftPeerId serverId, long callId, - boolean success, Message message, NotLeaderException notLeaderException) { + boolean success, Message message, RaftException exception) { super(clientId, serverId); this.success = success; this.callId = callId; this.message = message; - this.notLeaderException = notLeaderException; + this.exception = exception; } public RaftClientReply(RaftClientRequest request, - NotLeaderException notLeaderException) { + RaftException exception) { this(request.getClientId(), request.getServerId(), request.getCallId(), - false, null, notLeaderException); + false, null, exception); } public RaftClientReply(RaftClientRequest request, Message message) { @@ -71,11 +75,21 @@ public class RaftClientReply extends RaftClientMessage { return message; } + public boolean isNotLeader() { + return exception instanceof NotLeaderException; + } + public NotLeaderException getNotLeaderException() { - return notLeaderException; + assert isNotLeader(); + return (NotLeaderException) exception; } - public boolean isNotLeader() { - return notLeaderException != null; + public StateMachineException getStateMachineException() { + assert hasStateMachineException(); + return (StateMachineException) exception; + } + + public boolean hasStateMachineException() { + return exception instanceof StateMachineException; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java index 099133d..68d808b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java @@ -18,7 +18,7 @@ package org.apache.ratis.protocol; public class StateMachineException extends RaftException { - public StateMachineException(String serverId, Exception cause) { + public StateMachineException(String serverId, Throwable cause) { super(cause.getClass().getName() + " from Server " + serverId, cause); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java index 64c7a15..5cc93b3 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java @@ -380,4 +380,15 @@ public abstract class RaftUtils { throw new IllegalStateException(String.valueOf(message.get())); } } + + public static Exception instantiateException(Class<? extends Exception> cls, + String message, Exception from) throws Exception { + Constructor<? extends Exception> cn = cls.getConstructor(String.class); + cn.setAccessible(true); + Exception ex = cn.newInstance(message); + if (from != null) { + ex.initCause(from); + } + return ex; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java index f17ee93..46cbe2b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java @@ -21,6 +21,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Interner; import com.google.common.collect.Interners; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Locale; public class StringUtils { @@ -83,4 +85,12 @@ public class StringUtils { return defaultValue; } } + + public static String stringifyException(Throwable e) { + StringWriter stm = new StringWriter(); + PrintWriter wrt = new PrintWriter(stm); + e.printStackTrace(wrt); + wrt.close(); + return stm.toString(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index e38e245..447f2ea 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -79,7 +79,7 @@ public class TestRaftStateMachineException { fail("Exception expected"); } catch (StateMachineException e) { e.printStackTrace(); - Assert.assertTrue(e.getMessage().contains("Fake Exception")); + Assert.assertTrue(e.getCause().getMessage().contains("Fake Exception")); } cluster.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java index 52ed851..fb3cc11 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java @@ -21,29 +21,19 @@ import org.apache.ratis.shaded.io.grpc.Metadata; import org.apache.ratis.shaded.io.grpc.Status; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.util.RaftUtils; +import org.apache.ratis.util.StringUtils; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.lang.reflect.Constructor; public class RaftGrpcUtil { public static final Metadata.Key<String> EXCEPTION_TYPE_KEY = Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); - public static String stringifyException(Throwable e) { - StringWriter stm = new StringWriter(); - PrintWriter wrt = new PrintWriter(stm); - e.printStackTrace(wrt); - wrt.close(); - return stm.toString(); - } - public static StatusRuntimeException wrapException(Throwable t) { Metadata trailers = new Metadata(); trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); return new StatusRuntimeException( - Status.INTERNAL.withDescription(RaftGrpcUtil.stringifyException(t)), + Status.INTERNAL.withDescription(StringUtils.stringifyException(t)), trailers); } @@ -55,7 +45,7 @@ public class RaftGrpcUtil { if (className != null) { try { Class<?> clazz = Class.forName(className); - final Exception unwrapped = instantiateException( + final Exception unwrapped = RaftUtils.instantiateException( clazz.asSubclass(Exception.class), status.getDescription(), se); return RaftUtils.asIOException(unwrapped); } catch (Exception e) { @@ -76,12 +66,4 @@ public class RaftGrpcUtil { return e; } - private static Exception instantiateException(Class<? extends Exception> cls, - String message, Exception from) throws Exception { - Constructor<? extends Exception> cn = cls.getConstructor(String.class); - cn.setAccessible(true); - Exception ex = cn.newInstance(message); - ex.initCause(from); - return ex; - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java index 2d15cea..7a36fa1 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java @@ -17,9 +17,14 @@ */ package org.apache.ratis.hadooprpc; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Client; +import org.apache.log4j.Level; import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.client.RaftClient; import org.apache.ratis.server.impl.RaftReconfigurationBaseTest; +import org.apache.ratis.util.RaftUtils; import java.io.IOException; @@ -28,6 +33,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONN public class TestRaftReconfigurationWithHadoopRpc extends RaftReconfigurationBaseTest { + static { + ((Log4JLogger) Client.LOG).getLogger().setLevel(Level.ERROR); + } + @Override public MiniRaftCluster getCluster(int peerNum) throws IOException { final Configuration hadoopConf = new Configuration(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 8c334dd..f8dcf62 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -154,13 +154,25 @@ message RaftClientRequestProto { bool readOnly = 3; } +message NotLeaderExceptionProto { + RaftPeerProto suggestedLeader = 1; + repeated RaftPeerProto peersInConf = 2; +} + +message StateMachineExceptionProto { + string exceptionClassName = 1; + string errorMsg = 2; + bytes stacktrace = 3; +} + message RaftClientReplyProto { RaftRpcReplyProto rpcReply = 1; ClientMessageEntryProto message = 2; - // the following 3 fields are used to indicate the server is not leader - bool isNotLeader = 3; - RaftPeerProto suggestedLeader = 4; - repeated RaftPeerProto peersInConf = 5; + + oneof ExceptionDetails { + NotLeaderExceptionProto notLeaderException = 3; + StateMachineExceptionProto stateMachineException = 4; + } } // setConfiguration request http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index c6b15e6..f6c9ade 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -516,8 +516,9 @@ public class LeaderState { return pending; } - void replyPendingRequest(long logIndex, CompletableFuture<Message> message) { - pendingRequests.replyPendingRequest(logIndex, message); + void replyPendingRequest(long logIndex, + CompletableFuture<Message> stateMachineFuture) { + pendingRequests.replyPendingRequest(logIndex, stateMachineFuture); } TransactionContext getTransactionContext(long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index b5bb4b9..356ed4e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -206,7 +206,7 @@ public class LogAppender extends Daemon { } catch (InterruptedIOException iioe) { throw iioe; } catch (IOException ioe) { - LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe); + LOG.trace(this + ": failed to send appendEntries; retry " + retry++, ioe); } if (isAppenderRunning()) { leaderState.getSyncInterval().sleep(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index ff407e4..d4b74f2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -88,16 +88,21 @@ class PendingRequests { return pendingRequest != null ? pendingRequest.getEntry() : null; } - void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) { + void replyPendingRequest(long index, + CompletableFuture<Message> stateMachineFuture) { final PendingRequest pending = pendingRequests.get(index); if (pending != null) { RaftUtils.assertTrue(pending.getIndex() == index); - messageFuture.whenComplete((reply, exception) -> { + stateMachineFuture.whenComplete((reply, exception) -> { if (exception == null) { pending.setSuccessReply(reply); } else { - pending.setException(exception); + // the exception is coming from the state machine. wrap it into the + // reply as a StateMachineException + final StateMachineException e = new StateMachineException( + server.getId().toString(), exception); + pending.setReply(new RaftClientReply(pending.getRequest(), e)); } }); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 897b2a8..ee74dc8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -434,8 +434,9 @@ public class RaftServerImpl implements RaftServer { if (cause == null) { throw new IOException(e); } - if (cause instanceof NotLeaderException) { - return new RaftClientReply(request, (NotLeaderException)cause); + if (cause instanceof NotLeaderException || + cause instanceof StateMachineException) { + return new RaftClientReply(request, (RaftException) cause); } else { throw RaftUtils.asIOException(cause); } @@ -797,9 +798,9 @@ public class RaftServerImpl implements RaftServer { } synchronized void replyPendingRequest(long logIndex, - CompletableFuture<Message> message) { + CompletableFuture<Message> stateMachineFuture) { if (isLeader() && leaderState != null) { // is leader and is running - leaderState.replyPendingRequest(logIndex, message); + leaderState.replyPendingRequest(logIndex, stateMachineFuture); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 50aeae8..b4fc705 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -162,9 +162,9 @@ class StateMachineUpdater implements Runnable { trx = stateMachine.applyTransactionSerial(trx); // TODO: This step can be parallelized - CompletableFuture<Message> messageFuture = + CompletableFuture<Message> stateMachineFuture = stateMachine.applyTransaction(trx); - server.replyPendingRequest(next.getIndex(), messageFuture); + server.replyPendingRequest(next.getIndex(), stateMachineFuture); } lastAppliedIndex++; } else {
