Repository: incubator-ratis Updated Branches: refs/heads/master 7a5c3ea12 -> 96c470d88
RATIS-116. In PendingRequests, the requests are never removed from the map. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/96c470d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/96c470d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/96c470d8 Branch: refs/heads/master Commit: 96c470d884423f0997ac06073b53a541be856d11 Parents: 7a5c3ea Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Wed Oct 11 11:05:31 2017 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Wed Oct 11 11:05:31 2017 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 2 +- .../ratis/server/impl/PendingRequest.java | 10 ++-- .../ratis/server/impl/PendingRequests.java | 57 +++++++++++++++----- 3 files changed, 51 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96c470d8/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index c682350..f1fbeb0 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -82,7 +82,7 @@ public interface RaftClient extends Closeable { retryInterval = RaftClientConfigKeys.Rpc.timeout(properties); } return ClientImplUtils.newRaftClient(clientId, - Objects.requireNonNull(group, "The 'servers' field is not initialized."), + Objects.requireNonNull(group, "The 'group' field is not initialized."), leaderId, Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."), retryInterval); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96c470d8/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java index f1909d4..95731c5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java @@ -17,10 +17,7 @@ */ package org.apache.ratis.server.impl; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.*; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Preconditions; @@ -73,6 +70,11 @@ public class PendingRequest implements Comparable<PendingRequest> { future.complete(r); } + TransactionContext setNotLeaderException(NotLeaderException nle) { + setReply(new RaftClientReply(getRequest(), nle)); + return getEntry(); + } + void setSuccessReply(Message message) { setReply(new RaftClientReply(getRequest(), message)); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96c470d8/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index b7b8a9e..ab8ad2c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -31,13 +31,52 @@ import java.util.stream.Collectors; class PendingRequests { private static final Logger LOG = RaftServerImpl.LOG; + private static class RequestMap { + private final Object name; + private final ConcurrentMap<Long, PendingRequest> map = new ConcurrentHashMap<>(); + + RequestMap(Object name) { + this.name = name; + } + + void put(long index, PendingRequest p) { + LOG.debug("{}: PendingRequests.put {} -> {}", name, index, p); + final PendingRequest previous = map.put(index, p); + Preconditions.assertTrue(previous == null); + } + + PendingRequest get(long index) { + final PendingRequest r = map.get(index); + LOG.debug("{}: PendingRequests.get {} returns {}", name, index, r); + return r; + } + + PendingRequest remove(long index) { + final PendingRequest r = map.remove(index); + LOG.debug("{}: PendingRequests.remove{} returns {}", name, index, r); + return r; + } + + Collection<TransactionContext> setNotLeaderException(NotLeaderException nle) { + LOG.debug("{}: PendingRequests.setNotLeaderException", name); + try { + return map.values().stream() + .map(p -> p.setNotLeaderException(nle)) + .collect(Collectors.toList()); + } finally { + map.clear(); + } + } + } + private PendingRequest pendingSetConf; private final RaftServerImpl server; - private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>(); + private final RequestMap pendingRequests; private PendingRequest last = null; PendingRequests(RaftServerImpl server) { this.server = server; + this.pendingRequests = new RequestMap(server.getId()); } PendingRequest addPendingRequest(long index, RaftClientRequest request, @@ -92,7 +131,7 @@ class PendingRequests { } void replyPendingRequest(long index, RaftClientReply reply) { - final PendingRequest pending = pendingRequests.get(index); + final PendingRequest pending = pendingRequests.remove(index); if (pending != null) { Preconditions.assertTrue(pending.getIndex() == index); pending.setReply(reply); @@ -107,19 +146,11 @@ class PendingRequests { LOG.info("{} sends responses before shutting down PendingRequestsHandler", server.getId()); - Collection<TransactionContext> pendingEntries = pendingRequests.values().stream() - .map(PendingRequest::getEntry).collect(Collectors.toList()); // notify the state machine about stepping down - server.getStateMachine().notifyNotLeader(pendingEntries); - pendingRequests.values().forEach(this::setNotLeaderException); + final NotLeaderException nle = server.generateNotLeaderException(); + server.getStateMachine().notifyNotLeader(pendingRequests.setNotLeaderException(nle)); if (pendingSetConf != null) { - setNotLeaderException(pendingSetConf); + pendingSetConf.setNotLeaderException(nle); } } - - private void setNotLeaderException(PendingRequest pending) { - RaftClientReply reply = new RaftClientReply(pending.getRequest(), - server.generateNotLeaderException()); - pending.setReply(reply); - } }
