Repository: incubator-ratis Updated Branches: refs/heads/master 1a74e13ea -> 8f0a3ed50
RATIS-162. Clean up reply exception handling code for RATIS-140. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8f0a3ed5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8f0a3ed5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8f0a3ed5 Branch: refs/heads/master Commit: 8f0a3ed50aa7a347558551ab5ce3a8bc2e8e92a0 Parents: 1a74e13 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Tue Dec 5 08:55:56 2017 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Tue Dec 5 08:55:56 2017 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 8 +- .../org/apache/ratis/client/RaftClientRpc.java | 2 +- .../ratis/client/impl/ClientImplUtils.java | 5 +- .../ratis/client/impl/ClientProtoUtils.java | 9 ++- .../ratis/client/impl/RaftClientImpl.java | 79 ++++++++++++-------- .../client/impl/RaftClientRpcWithProxy.java | 4 +- .../ratis/protocol/LeaderNotReadyException.java | 4 +- .../apache/ratis/protocol/RaftClientReply.java | 40 +++++----- .../java/org/apache/ratis/protocol/RaftId.java | 5 +- .../java/org/apache/ratis/util/JavaUtils.java | 32 ++++++++ .../java/org/apache/ratis/util/LogUtils.java | 10 ++- .../ratis/util/UncheckedAutoCloseable.java | 27 +++++++ .../examples/filestore/FileStoreClient.java | 6 +- .../TestRaftStateMachineException.java | 5 +- .../org/apache/ratis/grpc/RaftGrpcUtil.java | 12 +-- .../ratis/grpc/client/AppendStreamer.java | 7 +- .../apache/ratis/grpc/client/GrpcClientRpc.java | 45 +++++------ .../grpc/client/RaftClientProtocolClient.java | 22 ++++-- .../grpc/client/RaftClientProtocolProxy.java | 6 +- .../ratis/server/impl/PendingRequests.java | 3 +- .../ratis/server/impl/RaftServerImpl.java | 6 +- .../apache/ratis/server/impl/RetryCache.java | 6 +- .../ratis/server/impl/ServerProtoUtils.java | 2 +- .../apache/ratis/server/impl/ServerState.java | 21 ++++-- .../java/org/apache/ratis/MiniRaftCluster.java | 9 +++ .../org/apache/ratis/RaftExceptionBaseTest.java | 13 ++-- .../impl/RaftReconfigurationBaseTest.java | 2 +- .../server/simulation/SimulatedClientRpc.java | 2 +- 28 files changed, 242 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index d2c5c1a..89fb8f4 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -21,9 +21,8 @@ import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.client.impl.ClientImplUtils; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.rpc.RpcType; -import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.*; +import org.apache.ratis.rpc.RpcType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +82,6 @@ public interface RaftClient extends Closeable { private RaftGroup group; private RaftPeerId leaderId; private RaftProperties properties; - private TimeDuration retryInterval = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT; private Parameters parameters; private Builder() {} @@ -94,8 +92,6 @@ public interface RaftClient extends Closeable { clientId = ClientId.randomId(); } if (properties != null) { - retryInterval = RaftClientConfigKeys.Rpc.timeout(properties); - if (clientRpc == null) { final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(parameters)); @@ -106,7 +102,7 @@ public interface RaftClient extends Closeable { Objects.requireNonNull(group, "The 'group' field is not initialized."), leaderId, Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."), - retryInterval, properties); + properties); } /** Set {@link RaftClient} ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java index 51f4430..c6c2d61 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java @@ -40,5 +40,5 @@ public interface RaftClientRpc extends Closeable { void addServers(Iterable<RaftPeer> servers); /** Handle the given exception. For example, try reconnecting. */ - void handleException(RaftPeerId serverId, Exception e); + void handleException(RaftPeerId serverId, Exception e, boolean shouldClose); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java index e7c89f3..d813650 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -28,8 +28,7 @@ import org.apache.ratis.protocol.RaftPeerId; /** Client utilities for internal use. */ public class ClientImplUtils { public static RaftClient newRaftClient(ClientId clientId, RaftGroup group, - RaftPeerId leaderId, RaftClientRpc clientRpc, TimeDuration retryInterval, - RaftProperties properties) { - return new RaftClientImpl(clientId, group, leaderId, clientRpc, retryInterval, properties); + RaftPeerId leaderId, RaftClientRpc clientRpc, RaftProperties properties) { + return new RaftClientImpl(clientId, group, leaderId, clientRpc, properties); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 2d7c13e..43c06de 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -109,8 +109,10 @@ public class ClientProtoUtils { if (reply.getMessage() != null) { b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage())); } - if (reply.isNotLeader()) { - NotLeaderException nle = reply.getNotLeaderException(); + + final NotLeaderException nle = reply.getNotLeaderException(); + final StateMachineException sme; + if (nle != null) { NotLeaderExceptionProto.Builder nleBuilder = NotLeaderExceptionProto.newBuilder(); final RaftPeer suggestedLeader = nle.getSuggestedLeader(); @@ -120,8 +122,7 @@ public class ClientProtoUtils { nleBuilder.addAllPeersInConf( ProtoUtils.toRaftPeerProtos(Arrays.asList(nle.getPeers()))); b.setNotLeaderException(nleBuilder.build()); - } else if (reply.hasStateMachineException()) { - StateMachineException sme = reply.getStateMachineException(); + } else if ((sme = reply.getStateMachineException()) != null) { StateMachineExceptionProto.Builder smeBuilder = StateMachineExceptionProto.newBuilder(); final Throwable t = sme.getCause() != null ? sme.getCause() : sme; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 9c66a9f..ba1a107 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -32,6 +32,7 @@ import java.io.InterruptedIOException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; @@ -56,14 +57,15 @@ final class RaftClientImpl implements RaftClient { private final Semaphore asyncRequestSemaphore; RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, - RaftClientRpc clientRpc, TimeDuration retryInterval, RaftProperties properties) { + RaftClientRpc clientRpc, RaftProperties properties) { this.clientId = clientId; this.clientRpc = clientRpc; this.peers = new ConcurrentLinkedQueue<>(group.getPeers()); this.groupId = group.getGroupId(); this.leaderId = leaderId != null? leaderId : !peers.isEmpty()? peers.iterator().next().getId(): null; - this.retryInterval = retryInterval; + this.retryInterval = RaftClientConfigKeys.Rpc.timeout(properties); + asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties)); scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties)); clientRpc.addServers(peers); @@ -101,12 +103,8 @@ final class RaftClientImpl implements RaftClient { final long seqNum = nextSeqNum(); return sendRequestWithRetryAsync( () -> new RaftClientRequest(clientId, leaderId, groupId, callId, seqNum, message, readOnly) - ).thenApply(reply -> { - if (reply.hasStateMachineException() || reply.hasGroupMismatchException()) { - throw new CompletionException(reply.getException()); - } - return reply; - }).whenComplete((r, e) -> asyncRequestSemaphore.release()); + ).thenApply(reply -> handleStateMachineException(reply, CompletionException::new) + ).whenComplete((r, e) -> asyncRequestSemaphore.release()); } @Override @@ -203,15 +201,12 @@ final class RaftClientImpl implements RaftClient { private CompletableFuture<RaftClientReply> sendRequestAsync( RaftClientRequest request) { - LOG.debug("{}: sendAsync {}", clientId, request); + LOG.debug("{}: send* {}", clientId, request); return clientRpc.sendRequestAsync(request).thenApply(reply -> { - LOG.debug("{}: receive {}", clientId, reply); - if (reply != null && reply.isNotLeader()) { - handleNotLeaderException(request, reply.getNotLeaderException()); - return null; - } - return reply; + LOG.debug("{}: receive* {}", clientId, reply); + return handleNotLeaderException(request, reply); }).exceptionally(e -> { + LOG.debug("{}: Failed {} with {}", clientId, request, e); final Throwable cause = e.getCause(); if (cause instanceof GroupMismatchException) { return new RaftClientReply(request, (RaftException) cause); @@ -233,26 +228,40 @@ final class RaftClientImpl implements RaftClient { } catch (IOException ioe) { handleIOException(request, ioe, null); } + LOG.debug("{}: receive {}", clientId, reply); + reply = handleNotLeaderException(request, reply); + reply = handleStateMachineException(reply, Function.identity()); + return reply; + } + + static <E extends Throwable> RaftClientReply handleStateMachineException( + RaftClientReply reply, Function<StateMachineException, E> converter) throws E { if (reply != null) { - LOG.debug("{}: receive {}", clientId, reply); - if (reply.isNotLeader()) { - handleNotLeaderException(request, reply.getNotLeaderException()); - return null; - } else if (reply.hasStateMachineException()) { - throw reply.getStateMachineException(); - } else { - return reply; + final StateMachineException sme = reply.getStateMachineException(); + if (sme != null) { + throw converter.apply(sme); } } - return null; + return reply; } - private void handleNotLeaderException(RaftClientRequest request, - NotLeaderException nle) { + /** + * @return null if the reply is null or it has {@link NotLeaderException}; + * otherwise return the same reply. + */ + private RaftClientReply handleNotLeaderException(RaftClientRequest request, RaftClientReply reply) { + if (reply == null) { + return null; + } + final NotLeaderException nle = reply.getNotLeaderException(); + if (nle == null) { + return reply; + } refreshPeers(Arrays.asList(nle.getPeers())); final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null : nle.getSuggestedLeader().getId(); handleIOException(request, nle, newLeader); + return null; } private void refreshPeers(Collection<RaftPeer> newPeers) { @@ -266,23 +275,29 @@ final class RaftClientImpl implements RaftClient { private void handleIOException(RaftClientRequest request, IOException ioe, RaftPeerId newLeader) { - LOG.debug("{}: suggested new leader: {}. Failed with {}", clientId, - newLeader, ioe); + LOG.debug("{}: suggested new leader: {}. Failed {} with {}", + clientId, newLeader, request, ioe); if (LOG.isTraceEnabled()) { LOG.trace("Stack trace", new Throwable("TRACE")); } - final RaftPeerId oldLeader = request.getServerId(); - clientRpc.handleException(oldLeader, ioe); + if (ioe instanceof LeaderNotReadyException) { + return; + } - if (newLeader == null && oldLeader.equals(leaderId)) { + final RaftPeerId oldLeader = request.getServerId(); + final boolean stillLeader = oldLeader.equals(leaderId); + if (newLeader == null && stillLeader) { newLeader = CollectionUtils.random(oldLeader, CollectionUtils.as(peers, RaftPeer::getId)); } - if (newLeader != null && oldLeader.equals(leaderId)) { + + final boolean changeLeader = newLeader != null && stillLeader; + if (changeLeader) { LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, newLeader); this.leaderId = newLeader; } + clientRpc.handleException(oldLeader, ioe, changeLeader); } void assertAsyncRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java index 835a876..6f4f37f 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java @@ -48,8 +48,8 @@ public abstract class RaftClientRpcWithProxy<PROXY extends Closeable> } @Override - public void handleException(RaftPeerId serverId, Exception e) { - if (ReflectionUtils.isInstance(e, + public void handleException(RaftPeerId serverId, Exception e, boolean shouldClose) { + if (shouldClose || ReflectionUtils.isInstance(e, SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class)) { proxies.resetProxy(serverId); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java index 33f6a4d..55af3f4 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java @@ -24,8 +24,8 @@ package org.apache.ratis.protocol; * it cannot determine whether a request is just a retry. */ public class LeaderNotReadyException extends RaftException { - public LeaderNotReadyException() { - this("The leader is not ready yet"); + public LeaderNotReadyException(RaftPeerId id) { + this(id + " is in LEADER state but not ready yet."); } public LeaderNotReadyException(String msg) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index ea59352..77a987d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -17,6 +17,10 @@ */ package org.apache.ratis.protocol; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ReflectionUtils; + /** * Reply from server to client */ @@ -40,6 +44,14 @@ public class RaftClientReply extends RaftClientMessage { this.callId = callId; this.message = message; this.exception = exception; + + if (exception != null) { + Preconditions.assertTrue(!success, + () -> "Inconsistent parameters: success && exception != null: " + this); + Preconditions.assertTrue( + ReflectionUtils.isInstance(exception, NotLeaderException.class, StateMachineException.class), + () -> "Unexpected exception class: " + this); + } } public RaftClientReply(RaftClientRequest request, @@ -64,8 +76,8 @@ public class RaftClientReply extends RaftClientMessage { @Override public String toString() { - return super.toString() + ", callId: " + getCallId() - + ", success: " + isSuccess(); + return super.toString() + ", cid=" + getCallId() + + ", success? " + isSuccess() + ", exception=" + exception; } public boolean isSuccess() { @@ -76,29 +88,13 @@ public class RaftClientReply extends RaftClientMessage { return message; } - public boolean isNotLeader() { - return exception instanceof NotLeaderException; - } - + /** If this reply has {@link NotLeaderException}, return it; otherwise return null. */ public NotLeaderException getNotLeaderException() { - assert isNotLeader(); - return (NotLeaderException) exception; + return JavaUtils.cast(exception, NotLeaderException.class); } + /** If this reply has {@link StateMachineException}, return it; otherwise return null. */ public StateMachineException getStateMachineException() { - assert hasStateMachineException(); - return (StateMachineException) exception; - } - - public boolean hasStateMachineException() { - return exception instanceof StateMachineException; - } - - public boolean hasGroupMismatchException(){ - return exception instanceof GroupMismatchException; - } - - public RaftException getException(){ - return exception; + return JavaUtils.cast(exception, StateMachineException.class); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java index 0846856..4b45765 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java @@ -67,8 +67,11 @@ public abstract class RaftId { this(toUuid(uuidBytes), () -> uuidBytes); } + /** @return the last 12 hex digits. */ String createUuidString(UUID uuid) { - return uuid.toString().toUpperCase(); + final String s = uuid.toString().toUpperCase(); + final int i = s.lastIndexOf('-'); + return s.substring(i + 1); } public ByteString toByteString() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index 89407eb..e910f28 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -27,8 +27,12 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.Objects; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Supplier; @@ -40,6 +44,15 @@ public interface JavaUtils { Logger LOG = LoggerFactory.getLogger(JavaUtils.class); /** + * The same as {@link Class#cast(Object)} except that + * this method returns null (but not throw {@link ClassCastException}) + * if the given object is not an instance of the given class. + */ + static <T> T cast(Object obj, Class<T> clazz) { + return clazz.isInstance(obj)? clazz.cast(obj): null; + } + + /** * Invoke {@link Callable#call()} and, if there any, * wrap the checked exception by {@link RuntimeException}. */ @@ -171,6 +184,20 @@ public interface JavaUtils { } } + Supplier<Timer> TIMER = memoize(() -> new Timer(true)); + + static UncheckedAutoCloseable runRepeatedly(Runnable runnable, long delay, long period, TimeUnit unit) { + final Timer timer = TIMER.get(); + timer.schedule(new TimerTask() { + @Override + public void run() { + runnable.run(); + } + }, unit.toMillis(delay), unit.toMillis(period)); + + return timer::cancel; + } + static void dumpAllThreads(Consumer<String> println) { final ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean(); for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) { @@ -183,4 +210,9 @@ public interface JavaUtils { future.completeExceptionally(t); return future; } + + static Throwable unwrapCompletionException(Throwable t) { + Objects.requireNonNull(t, "t == null"); + return t instanceof CompletionException && t.getCause() != null? t.getCause(): t; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java index 6a4d833..e75b89b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java @@ -23,6 +23,7 @@ package org.apache.ratis.util; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -32,9 +33,16 @@ import java.util.function.Supplier; * Logging (as in log4j) related utility methods. */ public interface LogUtils { + Logger LOG = LoggerFactory.getLogger(LogUtils.class); static void setLogLevel(Logger logger, Level level) { - LogManager.getLogger(logger.getName()).setLevel(level); + final String name = logger.getName(); + if (LOG.isTraceEnabled()) { + LOG.trace("", new Throwable("Set " + name + " log level to " + level)); + } else { + LOG.info("Set {} log level to {}", name, level); + } + LogManager.getLogger(name).setLevel(level); } static <THROWABLE extends Throwable> void runAndLog( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java b/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java new file mode 100644 index 0000000..cc73159 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +/** + * The same as {@link AutoCloseable} + * except that the close method does not throw {@link Exception}. + */ +public interface UncheckedAutoCloseable extends AutoCloseable { + @Override + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java index 59d5079..380f315 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java @@ -22,6 +22,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.ExamplesProtos.*; import org.apache.ratis.util.CheckedFunction; @@ -57,8 +58,9 @@ public class FileStoreClient implements Closeable { ByteString request, CheckedFunction<Message, RaftClientReply, IOException> sendFunction) throws IOException { final RaftClientReply reply = sendFunction.apply(() -> request); - if (reply.hasStateMachineException()) { - throw new IOException("Failed to send request " + request, reply.getStateMachineException()); + final StateMachineException sme = reply.getStateMachineException(); + if (sme != null) { + throw new IOException("Failed to send request " + request, sme); } Preconditions.assertTrue(reply.isSuccess(), () -> "reply=" + reply); return reply.getMessage().getContent(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index 158875d..b0e2b7c 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -35,6 +35,7 @@ import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Collection; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import static org.junit.Assert.fail; @@ -146,7 +147,7 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest { RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, cluster.getGroupId(), callId, seqNum, new SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); - Assert.assertTrue(reply.hasStateMachineException()); + Objects.requireNonNull(reply.getStateMachineException()); RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry( cluster.getLeader(), client.getId(), callId); @@ -155,7 +156,7 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest { // retry reply = rpc.sendRequest(r); - Assert.assertTrue(reply.hasStateMachineException()); + Objects.requireNonNull(reply.getStateMachineException()); RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry( cluster.getLeader(), client.getId(), callId); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java index d373ddb..5499878 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java @@ -22,10 +22,7 @@ import org.apache.ratis.shaded.io.grpc.Metadata; import org.apache.ratis.shaded.io.grpc.Status; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.util.CheckedSupplier; -import org.apache.ratis.util.IOUtils; -import org.apache.ratis.util.ReflectionUtils; -import org.apache.ratis.util.StringUtils; +import org.apache.ratis.util.*; import java.io.IOException; import java.util.Objects; @@ -38,12 +35,7 @@ public interface RaftGrpcUtil { Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); static StatusRuntimeException wrapException(Throwable t) { - Objects.requireNonNull(t, "t == null"); - if (t instanceof CompletionException) { - if (t.getCause() != null) { - t = t.getCause(); - } - } + t = JavaUtils.unwrapCompletionException(t); Metadata trailers = new Metadata(); trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index 9c238f4..810121b 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -97,7 +97,7 @@ public class AppendStreamer implements Closeable { this.peers = group.getPeers().stream().collect( Collectors.toMap(RaftPeer::getId, Function.identity())); proxyMap = new PeerProxyMap<>(clientId.toString(), - raftPeer -> new RaftClientProtocolProxy(raftPeer, ResponseHandler::new)); + raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new)); proxyMap.addPeers(group.getPeers()); refreshLeaderProxy(leaderId, null); @@ -277,10 +277,11 @@ public class AppendStreamer implements Closeable { } else { // this may be a NotLeaderException RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply); - if (r.isNotLeader()) { + final NotLeaderException nle = r.getNotLeaderException(); + if (nle != null) { LOG.debug("{} received a NotLeaderException from {}", this, r.getServerId()); - handleNotLeader(r.getNotLeaderException(), targetId); + handleNotLeader(nle, targetId); } } AppendStreamer.this.notifyAll(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 2b7de70..c5c188e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -23,13 +23,13 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.*; -import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.PeerProxyMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +43,13 @@ import static org.apache.ratis.client.impl.ClientProtoUtils.*; public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> { public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class); + + private final ClientId clientId; private final int maxMessageSize; public GrpcClientRpc(ClientId clientId, RaftProperties properties) { - super(new PeerProxyMap<>(clientId.toString(), RaftClientProtocolClient::new)); + super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p))); + this.clientId = clientId; maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt(); } @@ -57,10 +60,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie try { return sendRequestAsync(request, getProxies().getProxy(serverId)); } catch (IOException e) { - final CompletableFuture<RaftClientReply> replyFuture = - new CompletableFuture<>(); - replyFuture.completeExceptionally(e); - return replyFuture; + return JavaUtils.completeExceptionally(e); } } @@ -83,16 +83,10 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie return ClientProtoUtils.toServerInformationReply( proxy.serverInformation(proto)); } else { - RaftClientRequestProto requestProto = toRaftClientRequestProto(request); - if (requestProto.getSerializedSize() > maxMessageSize) { - throw new IOException("msg size:" + requestProto.getSerializedSize() + - " exceeds maximum:" + maxMessageSize); - } - final CompletableFuture<RaftClientReply> replyFuture = - sendRequestAsync(request, proxy); + final CompletableFuture<RaftClientReply> f = sendRequestAsync(request, proxy); // TODO: timeout support try { - return replyFuture.get(); + return f.get(); } catch (InterruptedException e) { throw new InterruptedIOException( "Interrupted while waiting for response of request " + request); @@ -103,7 +97,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie } private CompletableFuture<RaftClientReply> sendRequestAsync( - RaftClientRequest request, RaftClientProtocolClient proxy) { + RaftClientRequest request, RaftClientProtocolClient proxy) throws IOException { final RaftClientRequestProto requestProto = toRaftClientRequestProto(request); final CompletableFuture<RaftClientReplyProto> replyFuture = @@ -117,22 +111,14 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie @Override public void onError(Throwable t) { - // This implementation is used as RaftClientRpc. Retry - // logic on Exception is in RaftClient. - final IOException e; - if (t instanceof StatusRuntimeException) { - e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); - } else { - e = IOUtils.asIOException(t); - } - replyFuture.completeExceptionally(e); + replyFuture.completeExceptionally(RaftGrpcUtil.unwrapIOException(t)); } @Override public void onCompleted() { if (!replyFuture.isDone()) { replyFuture.completeExceptionally( - new IOException("No reply for request " + request)); + new IOException(clientId + ": Stream completed but no reply for request " + request)); } } }); @@ -141,4 +127,13 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie return replyFuture.thenApply(replyProto -> toRaftClientReply(replyProto)); } + + RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) throws IOException { + final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request); + if (proto.getSerializedSize() > maxMessageSize) { + throw new IOException(clientId + ": Message size:" + proto.getSerializedSize() + + " exceeds maximum:" + maxMessageSize); + } + return proto; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java index 12a3a5b..ace90f2 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java @@ -18,35 +18,39 @@ package org.apache.ratis.grpc.client; import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.shaded.io.grpc.ManagedChannel; import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc; import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub; import org.apache.ratis.util.CheckedSupplier; +import org.apache.ratis.util.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.util.function.Supplier; public class RaftClientProtocolClient implements Closeable { + public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolClient.class); + + private final Supplier<String> name; private final RaftPeer target; private final ManagedChannel channel; private final RaftClientProtocolServiceBlockingStub blockingStub; private final RaftClientProtocolServiceStub asyncStub; private final AdminProtocolServiceBlockingStub adminBlockingStub; - public RaftClientProtocolClient(RaftPeer target) { + public RaftClientProtocolClient(ClientId id, RaftPeer target) { + this.name = JavaUtils.memoize(() -> id + "->" + target.getId()); this.target = target; channel = ManagedChannelBuilder.forTarget(target.getAddress()) .usePlaintext(true).build(); @@ -55,6 +59,10 @@ public class RaftClientProtocolClient implements Closeable { adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel); } + String getName() { + return name.get(); + } + @Override public void close() { channel.shutdownNow(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java index 6892c71..297fe26 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.grpc.client; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; @@ -31,9 +32,10 @@ public class RaftClientProtocolProxy implements Closeable { private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation; private RpcSession currentSession; - public RaftClientProtocolProxy(RaftPeer target, + public RaftClientProtocolProxy( + ClientId clientId, RaftPeer target, Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation) { - proxy = new RaftClientProtocolClient(target); + proxy = new RaftClientProtocolClient(clientId, target); this.responseHandlerCreation = responseHandlerCreation; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index f651230..1dead9d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -21,6 +21,7 @@ import org.apache.ratis.protocol.*; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -29,7 +30,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; class PendingRequests { - private static final Logger LOG = RaftServerImpl.LOG; + public static final Logger LOG = LoggerFactory.getLogger(PendingRequests.class); private static class RequestMap { private final Object name; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 122ff51..bc45b5f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -367,7 +367,7 @@ public class RaftServerImpl implements RaftServerProtocol, return RetryCache.failWithReply(reply, entry); } else { if (leaderState == null || !leaderState.isReady()) { - return RetryCache.failWithException(new LeaderNotReadyException(), entry); + return RetryCache.failWithException(new LeaderNotReadyException(getId()), entry); } } return null; @@ -732,8 +732,8 @@ public class RaftServerImpl implements RaftServerProtocol, final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( leaderId, getId(), groupId, currentTerm, nextIndex, NOT_LEADER); if (LOG.isDebugEnabled()) { - LOG.debug("{}: do not recognize leader. Reply: {}", - getId(), ProtoUtils.toString(reply)); + LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}", + getId(), leaderId, leaderTerm, state, ProtoUtils.toString(reply)); } return reply; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java index afe2a7e..5abfc1a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java @@ -27,6 +27,7 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.shaded.com.google.common.cache.Cache; import org.apache.ratis.shaded.com.google.common.cache.CacheBuilder; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -243,10 +244,7 @@ public class RetryCache implements Closeable { entry.failWithException(t); return entry.getReplyFuture(); } else { - final CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); - future.completeExceptionally(t); - return future; + return JavaUtils.completeExceptionally(t); } } - } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 9f84e05..3e44c76 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -64,7 +64,7 @@ public class ServerProtoUtils { final ByteString clientId = entry.getClientId(); return toTermIndexString(entry) + entry.getLogEntryBodyCase() + ", " + (clientId.isEmpty()? "<empty clientId>": ClientId.valueOf(clientId)) - + ", callId=" + entry.getCallId(); + + ", cid=" + entry.getCallId(); } public static String toString(LogEntryProto... entries) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index e83a931..a51d4b9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -35,8 +35,10 @@ import org.apache.ratis.util.ProtoUtils; import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.util.Objects; import java.util.function.Consumer; +import static org.apache.ratis.server.impl.RaftServerImpl.LOG; import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; /** @@ -169,7 +171,7 @@ public class ServerState implements Closeable { if (newTerm > currentTerm) { currentTerm = newTerm; votedFor = null; - leaderId = null; + setLeader(null); return true; } return false; @@ -188,7 +190,7 @@ public class ServerState implements Closeable { */ long initElection() { votedFor = selfId; - leaderId = null; + setLeader(null); return ++currentTerm; } @@ -201,15 +203,18 @@ public class ServerState implements Closeable { */ void grantVote(RaftPeerId candidateId) { votedFor = candidateId; - leaderId = null; + setLeader(null); } - void setLeader(RaftPeerId leaderId) { - this.leaderId = leaderId; + void setLeader(RaftPeerId newLeaderId) { + if (!Objects.equals(leaderId, newLeaderId)) { + LOG.info("{}: change Leader from {} to {}", selfId, leaderId, newLeaderId); + leaderId = newLeaderId; + } } void becomeLeader() { - leaderId = selfId; + setLeader(selfId); } public RaftLog getLog() { @@ -280,7 +285,7 @@ public class ServerState implements Closeable { public void setRaftConf(long logIndex, RaftConfiguration conf) { configurationManager.addConfiguration(logIndex, conf); - RaftServerImpl.LOG.info("{}: successfully update the configuration {}", + LOG.info("{}: successfully update the configuration {}", getSelfId(), conf); } @@ -313,7 +318,7 @@ public class ServerState implements Closeable { @Override public void close() throws IOException { stateMachineUpdater.stop(); - RaftServerImpl.LOG.info("{} closes. The last applied log index is {}", + LOG.info("{} closes. The last applied log index is {}", getSelfId(), getLastAppliedIndex()); log.close(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 88fddf4..b50d847 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -145,6 +145,7 @@ public abstract class MiniRaftCluster { this.properties = new RaftProperties(properties); this.parameters = parameters; + JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT" + printServers()), 10, 10, TimeUnit.SECONDS); ExitUtils.disableSystemExit(); } @@ -470,6 +471,14 @@ public abstract class MiniRaftCluster { return createClient(null, g); } + public RaftClient createClientWithLeader() { + return createClient(getLeader().getId(), group); + } + + public RaftClient createClientWithFollower() { + return createClient(getFollowers().get(0).getId(), group); + } + public RaftClient createClient(RaftPeerId leaderId) { return createClient(leaderId, group); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java index 4672b9d..7ad5bd2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Objects; public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster> extends BaseTest @@ -103,9 +104,9 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster> } Assert.assertNotNull(reply); Assert.assertFalse(reply.isSuccess()); - Assert.assertTrue(reply.isNotLeader()); - Assert.assertEquals(newLeader, - reply.getNotLeaderException().getSuggestedLeader().getId()); + final NotLeaderException nle = reply.getNotLeaderException(); + Objects.requireNonNull(nle); + Assert.assertEquals(newLeader, nle.getSuggestedLeader().getId()); reply = client.send(new SimpleMessage("m3")); Assert.assertTrue(reply.isSuccess()); @@ -148,9 +149,9 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster> } Assert.assertNotNull(reply); Assert.assertFalse(reply.isSuccess()); - Assert.assertTrue(reply.isNotLeader()); - Assert.assertEquals(newLeader, - reply.getNotLeaderException().getSuggestedLeader().getId()); + final NotLeaderException nle = reply.getNotLeaderException(); + Objects.requireNonNull(nle); + Assert.assertEquals(newLeader, nle.getSuggestedLeader().getId()); Collection<RaftPeer> peers = cluster.getPeers(); RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers(); Assert.assertEquals(peers.size(), peersFromReply.length); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 1fccfc4..9b142fd 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -532,7 +532,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { final RaftClientRpc sender = client.getClientRpc(); RaftClientReply reply = sender.sendRequest(cluster.newSetConfigurationRequest( client.getId(), leaderId, change.allPeersInNewConf)); - if (reply.isNotLeader()) { + if (reply.getNotLeaderException() != null) { gotNotLeader.set(true); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java index 386d2e7..a141789 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java @@ -36,7 +36,7 @@ class SimulatedClientRpc } @Override - public void handleException(RaftPeerId serverId, Exception e) { + public void handleException(RaftPeerId serverId, Exception e, boolean shouldClose) { // do nothing }
