This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2e7211f RATIS-497. IllegalStateException: ILLEGAL TRANSITION: In
LogAppender(s4 -> s1), CLOSING -> RUNNING. Contributed by Tsz Wo Nicholas Sze.
2e7211f is described below
commit 2e7211f5c0c97c90744689d9f64e5ea81cfcdd93
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon May 6 15:46:58 2019 +0530
RATIS-497. IllegalStateException: ILLEGAL TRANSITION: In LogAppender(s4 ->
s1), CLOSING -> RUNNING. Contributed by Tsz Wo Nicholas Sze.
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 15 +--
.../apache/ratis/client/impl/RaftClientImpl.java | 5 +-
.../apache/ratis/protocol/NotLeaderException.java | 17 ++--
.../ratis/protocol/SetConfigurationRequest.java | 16 +--
.../java/org/apache/ratis/util/Preconditions.java | 17 ++++
.../java/org/apache/ratis/util/ProtoUtils.java | 10 +-
.../ratis/grpc/client/GrpcClientStreamer.java | 8 +-
.../apache/ratis/grpc/server/GrpcLogAppender.java | 42 ++++----
.../org/apache/ratis/server/impl/FollowerInfo.java | 12 ++-
.../org/apache/ratis/server/impl/LeaderState.java | 65 +++++++-----
.../org/apache/ratis/server/impl/LogAppender.java | 113 +++++++++++++--------
.../ratis/server/impl/RaftConfiguration.java | 27 ++---
.../apache/ratis/server/impl/RaftServerImpl.java | 7 +-
.../apache/ratis/server/impl/ServerProtoUtils.java | 4 +-
.../java/org/apache/ratis/MiniRaftCluster.java | 3 +-
.../org/apache/ratis/RaftExceptionBaseTest.java | 4 +-
.../ratis/server/impl/RaftServerTestUtil.java | 8 +-
.../org/apache/ratis/util/TestPreconditions.java | 53 ++++++++++
18 files changed, 261 insertions(+), 165 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 4cbebb0..d7aca72 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -23,7 +23,6 @@ import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReflectionUtils;
-import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -166,8 +165,7 @@ public interface ClientProtoUtils {
if (suggestedLeader != null) {
nleBuilder.setSuggestedLeader(suggestedLeader.getRaftPeerProto());
}
- nleBuilder.addAllPeersInConf(
- ProtoUtils.toRaftPeerProtos(Arrays.asList(nle.getPeers())));
+
nleBuilder.addAllPeersInConf(ProtoUtils.toRaftPeerProtos(nle.getPeers()));
b.setNotLeaderException(nleBuilder.build());
} else if ((sme = reply.getStateMachineException()) != null) {
StateMachineExceptionProto.Builder smeBuilder =
@@ -231,10 +229,8 @@ public interface ClientProtoUtils {
NotLeaderExceptionProto nleProto = replyProto.getNotLeaderException();
final RaftPeer suggestedLeader = nleProto.hasSuggestedLeader() ?
ProtoUtils.toRaftPeer(nleProto.getSuggestedLeader()) : null;
- final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(
- nleProto.getPeersInConfList());
- e = new NotLeaderException(RaftPeerId.valueOf(rp.getReplyId()),
- suggestedLeader, peers);
+ final List<RaftPeer> peers =
ProtoUtils.toRaftPeers(nleProto.getPeersInConfList());
+ e = new NotLeaderException(RaftPeerId.valueOf(rp.getReplyId()),
suggestedLeader, peers);
} else if (replyProto.getExceptionDetailsCase() == NOTREPLICATEDEXCEPTION)
{
final NotReplicatedExceptionProto nre =
replyProto.getNotReplicatedException();
e = new NotReplicatedException(nre.getCallId(), nre.getReplication(),
nre.getLogIndex());
@@ -316,7 +312,7 @@ public interface ClientProtoUtils {
static SetConfigurationRequest toSetConfigurationRequest(
SetConfigurationRequestProto p) {
final RaftRpcRequestProto m = p.getRpcRequest();
- final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList());
+ final List<RaftPeer> peers = ProtoUtils.toRaftPeers(p.getPeersList());
return new SetConfigurationRequest(
ClientId.valueOf(m.getRequestorId()),
RaftPeerId.valueOf(m.getReplyId()),
@@ -328,8 +324,7 @@ public interface ClientProtoUtils {
SetConfigurationRequest request) {
return SetConfigurationRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(request))
- .addAllPeers(ProtoUtils.toRaftPeerProtos(
- Arrays.asList(request.getPeersInNewConf())))
+ .addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInNewConf()))
.build();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f73e541..c463b9c 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -248,6 +248,7 @@ final class RaftClientImpl implements RaftClient {
return sendRequestWithRetry(() -> newRaftClientRequest(server, callId,
message, type, null));
}
+ // TODO: change peersInNewConf to List<RaftPeer>
@Override
public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf)
throws IOException {
@@ -257,7 +258,7 @@ final class RaftClientImpl implements RaftClient {
// also refresh the rpc proxies for these peers
addServers(Arrays.stream(peersInNewConf));
return sendRequestWithRetry(() -> new SetConfigurationRequest(
- clientId, leaderId, groupId, callId, peersInNewConf));
+ clientId, leaderId, groupId, callId, Arrays.asList(peersInNewConf)));
}
@Override
@@ -439,7 +440,7 @@ final class RaftClientImpl implements RaftClient {
RaftClientReply handleNotLeaderException(RaftClientRequest request,
NotLeaderException nle,
boolean resetSlidingWindow) {
- refreshPeers(Arrays.asList(nle.getPeers()));
+ refreshPeers(nle.getPeers());
final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
: nle.getSuggestedLeader().getId();
handleIOException(request, nle, newLeader, resetSlidingWindow);
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
index 453965e..a854d8c 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,24 +17,29 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+
public class NotLeaderException extends RaftException {
private final RaftPeer suggestedLeader;
/** the client may need to update its RaftPeer list */
- private final RaftPeer[] peers;
+ private final Collection<RaftPeer> peers;
- public NotLeaderException(RaftPeerId id, RaftPeer suggestedLeader,
- RaftPeer[] peers) {
+ public NotLeaderException(RaftPeerId id, RaftPeer suggestedLeader,
Collection<RaftPeer> peers) {
super("Server " + id + " is not the leader (" + suggestedLeader
+ "). Request must be sent to leader.");
this.suggestedLeader = suggestedLeader;
- this.peers = peers == null ? RaftPeer.emptyArray(): peers;
+ this.peers = peers != null? Collections.unmodifiableCollection(peers):
Collections.emptyList();
+ Preconditions.assertUnique(this.peers);
}
public RaftPeer getSuggestedLeader() {
return suggestedLeader;
}
- public RaftPeer[] getPeers() {
+ public Collection<RaftPeer> getPeers() {
return peers;
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index 1e45046..ae8219b 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -17,23 +17,27 @@
*/
package org.apache.ratis.protocol;
-import java.util.Arrays;
+import org.apache.ratis.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
public class SetConfigurationRequest extends RaftClientRequest {
- private final RaftPeer[] peers;
+ private final List<RaftPeer> peers;
public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId, RaftPeer[] peers) {
+ RaftGroupId groupId, long callId, List<RaftPeer> peers) {
super(clientId, serverId, groupId, callId, writeRequestType());
- this.peers = peers;
+ this.peers = peers != null? Collections.unmodifiableList(peers):
Collections.emptyList();
+ Preconditions.assertUnique(this.peers);
}
- public RaftPeer[] getPeersInNewConf() {
+ public List<RaftPeer> getPeersInNewConf() {
return peers;
}
@Override
public String toString() {
- return super.toString() + ", peers:" + Arrays.asList(getPeersInNewConf());
+ return super.toString() + ", peers:" + getPeersInNewConf();
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index 6ebfdc7..eac5855 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -20,6 +20,9 @@
package org.apache.ratis.util;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.function.Supplier;
public interface Preconditions {
@@ -81,4 +84,18 @@ public interface Preconditions {
assertNull(object, () -> name + " is expected to be null but "
+ name + " = " + object + " != null, class = " + object.getClass());
}
+
+ static <T> void assertUnique(Iterable<T> first) {
+ assertUnique(first, Collections.emptyList());
+ }
+
+ static <T> void assertUnique(Iterable<T> original, Iterable<T> toBeAdded) {
+ final Set<T> set = new HashSet<>();
+ for(T t : original) {
+ assertTrue(set.add(t), () -> "Found duplicated element " + t + " in " +
original);
+ }
+ for(T t : toBeAdded) {
+ assertTrue(set.add(t), () -> "Found duplicated element " + t + " when
adding " + toBeAdded + " to " + original);
+ }
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 60800fb..2b0b5aa 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -73,12 +73,8 @@ public interface ProtoUtils {
return new RaftPeer(RaftPeerId.valueOf(p.getId()), p.getAddress());
}
- static RaftPeer[] toRaftPeerArray(List<RaftPeerProto> protos) {
- final RaftPeer[] peers = new RaftPeer[protos.size()];
- for (int i = 0; i < peers.length; i++) {
- peers[i] = toRaftPeer(protos.get(i));
- }
- return peers;
+ static List<RaftPeer> toRaftPeers(List<RaftPeerProto> protos) {
+ return
protos.stream().map(ProtoUtils::toRaftPeer).collect(Collectors.toList());
}
static Iterable<RaftPeerProto> toRaftPeerProtos(
@@ -107,7 +103,7 @@ public interface ProtoUtils {
}
static RaftGroup toRaftGroup(RaftGroupProto proto) {
- return RaftGroup.valueOf(toRaftGroupId(proto.getGroupId()),
toRaftPeerArray(proto.getPeersList()));
+ return RaftGroup.valueOf(toRaftGroupId(proto.getGroupId()),
toRaftPeers(proto.getPeersList()));
}
static RaftGroupProto.Builder toRaftGroupProtoBuilder(RaftGroup group) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
index 1b33392..a93a622 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -376,11 +376,11 @@ public class GrpcClientStreamer implements Closeable {
}
}
- private void refreshPeers(RaftPeer[] newPeers) {
- if (newPeers != null && newPeers.length > 0) {
+ private void refreshPeers(Collection<RaftPeer> newPeers) {
+ if (newPeers != null && newPeers.size() > 0) {
// we only add new peers, we do not remove any peer even if it no longer
// belongs to the current raft conf
- Arrays.stream(newPeers).forEach(peer -> {
+ newPeers.forEach(peer -> {
peers.putIfAbsent(peer.getId(), peer);
proxyMap.computeIfAbsent(peer);
});
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 7d9e018..b8926f1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -74,11 +74,11 @@ public class GrpcLogAppender extends LogAppender {
}
private GrpcServerProtocolClient getClient() throws IOException {
- return rpcService.getProxies().getProxy(follower.getPeer().getId());
+ return rpcService.getProxies().getProxy(getFollowerId());
}
private synchronized void resetClient(AppendEntriesRequestProto request) {
- rpcService.getProxies().resetProxy(follower.getPeer().getId());
+ rpcService.getProxies().resetProxy(getFollowerId());
appendLogRequestObserver = null;
firstResponseReceived = false;
@@ -223,8 +223,8 @@ public class GrpcLogAppender extends LogAppender {
@Override
public void onNext(AppendEntriesReplyProto reply) {
if (LOG.isDebugEnabled()) {
- LOG.debug("{}<-{}: received {} reply {} ", server.getId(),
follower.getPeer(),
- (!firstResponseReceived? "the first": "a"),
ServerProtoUtils.toString(reply));
+ LOG.debug("{}: received {} reply {} ", getFollower().getName(),
+ firstResponseReceived? "a": "the first",
ServerProtoUtils.toString(reply));
}
try {
@@ -266,7 +266,7 @@ public class GrpcLogAppender extends LogAppender {
LOG.info("{} is stopped", GrpcLogAppender.this);
return;
}
- GrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " +
follower.getPeer(), t);
+ GrpcUtil.warn(LOG, () -> getFollower().getName() + ": Failed
appendEntries", t);
long callId = GrpcUtil.getCallId(t);
resetClient(pendingRequests.get(callId));
@@ -274,7 +274,7 @@ public class GrpcLogAppender extends LogAppender {
@Override
public void onCompleted() {
- LOG.info("{}: follower {} response Completed", server.getId(), follower);
+ LOG.info("{}: follower responses appendEntries COMPLETED",
getFollower().getName());
resetClient(null);
}
}
@@ -284,11 +284,11 @@ public class GrpcLogAppender extends LogAppender {
follower.decreaseNextIndex(newNextIndex);
}
- protected synchronized void onSuccess(AppendEntriesReplyProto reply) {
+ private synchronized void onSuccess(AppendEntriesReplyProto reply) {
AppendEntriesRequestProto request =
pendingRequests.remove(reply.getServerReply().getCallId());
if (request == null) {
// If reply comes after timeout, the reply is ignored.
- LOG.warn("{}: Request not found, ignoring reply: {}", this,
ServerProtoUtils.toString(reply));
+ LOG.warn("{}: Request not found, ignoring SUCCESS reply: {}", this,
ServerProtoUtils.toString(reply));
return;
}
updateCommitIndex(reply.getFollowerCommit());
@@ -327,7 +327,7 @@ public class GrpcLogAppender extends LogAppender {
AppendEntriesRequestProto request =
pendingRequests.remove(reply.getServerReply().getCallId());
if (request == null) {
// If reply comes after timeout, the reply is ignored.
- LOG.warn("{}: Ignoring {}", server.getId(), reply);
+ LOG.warn("{}: Request not found, ignoring INCONSISTENCY reply: {}",
this, ServerProtoUtils.toString(reply));
return;
}
Preconditions.assertTrue(request.hasPreviousLog());
@@ -351,7 +351,8 @@ public class GrpcLogAppender extends LogAppender {
}
synchronized void removePending(InstallSnapshotReplyProto reply) {
- int index = pending.poll();
+ final Integer index = pending.poll();
+ Objects.requireNonNull(index, "index == null");
Preconditions.assertTrue(index == reply.getRequestIndex());
}
@@ -370,9 +371,7 @@ public class GrpcLogAppender extends LogAppender {
@Override
public void onNext(InstallSnapshotReplyProto reply) {
- LOG.debug("{} received {} response from {}", server.getId(),
- (!firstResponseReceived ? "the first" : "a"),
- follower.getPeer());
+ LOG.debug("{}: received {} response", getFollower().getName(),
firstResponseReceived? "a": "the first");
// update the last rpc time
follower.updateLastRpcResponseTime();
@@ -411,15 +410,14 @@ public class GrpcLogAppender extends LogAppender {
LOG.info("{} is stopped", GrpcLogAppender.this);
return;
}
- LOG.info("{} got error when installing snapshot to {}, exception: {}",
- server.getId(), follower.getPeer(), t);
+ LOG.info("{}: got error when installing snapshot: {}",
getFollower().getName(), t);
resetClient(null);
close();
}
@Override
public void onCompleted() {
- LOG.info("Snapshot(s) sent from {} to follower {}", server.getId(),
follower);
+ LOG.info("{}: follower responses installSnapshot Completed",
getFollower().getName());
close();
}
}
@@ -429,10 +427,8 @@ public class GrpcLogAppender extends LogAppender {
* @param snapshot the snapshot to be sent to Follower
*/
private void installSnapshot(SnapshotInfo snapshot) {
- LOG.info("{}: follower {}'s next index is {}," +
- " log's start index is {}, need to install snapshot",
- server.getId(), follower.getPeer(), follower.getNextIndex(),
- raftLog.getStartIndex());
+ LOG.info("{}: follower's next index is {}, log's start index is {}, will
install snapshot",
+ getFollower().getName(), follower.getNextIndex(),
raftLog.getStartIndex());
final InstallSnapshotResponseHandler responseHandler = new
InstallSnapshotResponseHandler();
StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
@@ -451,8 +447,7 @@ public class GrpcLogAppender extends LogAppender {
}
snapshotRequestObserver.onCompleted();
} catch (Exception e) {
- LOG.warn("{} failed to install snapshot {}. Exception: {}", this,
- snapshot.getFiles(), e);
+ LOG.warn("{}: failed to install snapshot {}: {}", this,
snapshot.getFiles(), e);
if (snapshotRequestObserver != null) {
snapshotRequestObserver.onError(e);
}
@@ -470,8 +465,7 @@ public class GrpcLogAppender extends LogAppender {
if (responseHandler.hasAllResponse()) {
follower.setSnapshotIndex(snapshot.getTermIndex().getIndex());
- LOG.info("{}: install snapshot-{} successfully on follower {}",
- server.getId(), snapshot.getTermIndex().getIndex(),
follower.getPeer());
+ LOG.info("{}: install snapshot-{} successfully",
getFollower().getName(), snapshot.getTermIndex().getIndex());
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 5206528..6d52b7b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -96,6 +96,10 @@ public class FollowerInfo {
nextIndex.setUnconditionally(snapshotIndex + 1, infoIndexChange);
}
+ public String getName() {
+ return name;
+ }
+
@Override
public String toString() {
return name + "(c" + getCommitIndex() + ",m" + getMatchIndex() + ",n" +
getNextIndex()
@@ -108,7 +112,7 @@ public class FollowerInfo {
attendVote = true;
}
- public boolean isAttendingVote() {
+ boolean isAttendingVote() {
return attendVote;
}
@@ -121,7 +125,7 @@ public class FollowerInfo {
lastRpcResponseTime.set(Timestamp.currentTime());
}
- public Timestamp getLastRpcResponseTime() {
+ Timestamp getLastRpcResponseTime() {
return lastRpcResponseTime.get();
}
@@ -130,11 +134,11 @@ public class FollowerInfo {
lastRpcSendTime.set(Timestamp.currentTime());
}
- public Timestamp getLastRpcTime() {
+ Timestamp getLastRpcTime() {
return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
}
- public boolean isSlow() {
+ boolean isSlow() {
return lastRpcResponseTime.get().elapsedTimeMs() > rpcSlownessTimeoutMs;
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index e828fd8..78b56db 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -138,12 +138,8 @@ public class LeaderState {
static class SenderList {
private final List<LogAppender> senders;
- SenderList(LogAppender[] senders) {
- this.senders = new CopyOnWriteArrayList<>(senders);
- }
-
- int size() {
- return senders.size();
+ SenderList() {
+ this.senders = new CopyOnWriteArrayList<>();
}
Stream<LogAppender> stream() {
@@ -154,8 +150,17 @@ public class LeaderState {
senders.forEach(action);
}
- boolean addAll(Collection<LogAppender> c) {
- return senders.addAll(c);
+ void addAll(Collection<LogAppender> newSenders) {
+ if (newSenders.isEmpty()) {
+ return;
+ }
+
+ Preconditions.assertUnique(
+ CollectionUtils.as(senders, LogAppender::getFollowerId),
+ CollectionUtils.as(newSenders, LogAppender::getFollowerId));
+
+ final boolean changed = senders.addAll(newSenders);
+ Preconditions.assertTrue(changed);
}
boolean removeAll(Collection<LogAppender> c) {
@@ -204,13 +209,10 @@ public class LeaderState {
final RaftConfiguration conf = server.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
- final Timestamp t =
Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
placeHolderIndex = raftLog.getNextIndex();
- senders = new SenderList(others.stream().map(
- p -> server.newLogAppender(this, p, t, placeHolderIndex, true))
- .toArray(LogAppender[]::new));
-
+ senders = new SenderList();
+ addSenders(others, placeHolderIndex, true);
voterLists = divideFollowers(conf);
}
@@ -271,15 +273,14 @@ public class LeaderState {
PendingRequest startSetConfiguration(SetConfigurationRequest request) {
Preconditions.assertTrue(running && !inStagingState());
- RaftPeer[] peersInNewConf = request.getPeersInNewConf();
- Collection<RaftPeer> peersToBootStrap = RaftConfiguration
- .computeNewPeers(peersInNewConf, server.getRaftConf());
+ final List<RaftPeer> peersInNewConf = request.getPeersInNewConf();
+ final Collection<RaftPeer> peersToBootStrap =
server.getRaftConf().filterNotContainedInConf(peersInNewConf);
// add the request to the pending queue
final PendingRequest pending = pendingRequests.addConfRequest(request);
ConfigurationStagingState stagingState = new ConfigurationStagingState(
- peersToBootStrap, new
PeerConfiguration(Arrays.asList(peersInNewConf)));
+ peersToBootStrap, new PeerConfiguration(peersInNewConf));
Collection<RaftPeer> newPeers = stagingState.getNewPeers();
// set the staging state
this.stagingState = stagingState;
@@ -288,7 +289,7 @@ public class LeaderState {
applyOldNewConf();
} else {
// update the LeaderState's sender list
- addSenders(newPeers);
+ addAndStartSenders(newPeers);
}
return pending;
}
@@ -361,18 +362,19 @@ public class LeaderState {
}
/**
- * After receiving a setConfiguration request, the leader should update its
- * RpcSender list.
+ * Update sender list for setConfiguration request
*/
- void addSenders(Collection<RaftPeer> newMembers) {
- final Timestamp t =
Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
- final long nextIndex = raftLog.getNextIndex();
+ void addAndStartSenders(Collection<RaftPeer> newPeers) {
+ addSenders(newPeers, raftLog.getNextIndex(),
false).forEach(LogAppender::startAppender);
+ }
- senders.addAll(newMembers.stream().map(peer -> {
- LogAppender sender = server.newLogAppender(this, peer, t, nextIndex,
false);
- sender.startAppender();
- return sender;
- }).collect(Collectors.toList()));
+ Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long
nextIndex, boolean attendVote) {
+ final Timestamp t =
Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
+ final List<LogAppender> newAppenders = newPeers.stream()
+ .map(peer -> server.newLogAppender(this, peer, t, nextIndex,
attendVote))
+ .collect(Collectors.toList());
+ senders.addAll(newAppenders);
+ return newAppenders;
}
void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
@@ -381,6 +383,13 @@ public class LeaderState {
senders.removeAll(toStop);
}
+ void restartSender(LogAppender sender) {
+ final FollowerInfo follower = sender.getFollower();
+ LOG.info("{}: Restarting {} for {}", server.getId(),
sender.getClass().getSimpleName(), follower.getName());
+ senders.removeAll(Collections.singleton(sender));
+ addAndStartSenders(Collections.singleton(follower.getPeer()));
+ }
+
/**
* Update the RpcSender list based on the current configuration
*/
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 82c942d..fed52bc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -54,6 +54,69 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
public class LogAppender {
public static final Logger LOG = LoggerFactory.getLogger(LogAppender.class);
+ class AppenderDaemon {
+ private final LifeCycle lifeCycle;
+ private final Daemon daemon = new Daemon(this::run);
+
+ AppenderDaemon(Object name) {
+ this.lifeCycle = new LifeCycle(name);
+ }
+
+ void start() {
+ // The life cycle state could be already closed due to server shutdown.
+ if (lifeCycle.compareAndTransition(NEW, STARTING)) {
+ daemon.start();
+ }
+ }
+
+ void run() {
+ synchronized (lifeCycle) {
+ if (!isRunning()) {
+ return;
+ }
+ lifeCycle.transition(RUNNING);
+ }
+ try {
+ runAppenderImpl();
+ } catch (InterruptedException | InterruptedIOException e) {
+ LOG.info(this + " was interrupted: " + e);
+ } catch (RaftLogIOException e) {
+ LOG.error(this + " failed RaftLog", e);
+ lifeCycle.transition(EXCEPTION);
+ } catch (IOException e) {
+ LOG.error(this + " failed IOException", e);
+ lifeCycle.transition(EXCEPTION);
+ } catch (Throwable e) {
+ LOG.error(this + " unexpected exception", e);
+ lifeCycle.transition(EXCEPTION);
+ } finally {
+ if (!lifeCycle.compareAndTransition(CLOSING, CLOSED)) {
+ lifeCycle.transitionIfNotEqual(EXCEPTION);
+ }
+ if (lifeCycle.getCurrentState() == EXCEPTION) {
+ leaderState.restartSender(LogAppender.this);
+ }
+ }
+ }
+
+ boolean isRunning() {
+ return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED, EXCEPTION);
+ }
+
+ void stop() {
+ synchronized (lifeCycle) {
+ if (!isRunning()) {
+ return;
+ }
+ if (lifeCycle.compareAndTransition(NEW, CLOSED)) {
+ return;
+ }
+ lifeCycle.transition(CLOSING);
+ }
+ daemon.interrupt();
+ }
+ }
+
protected final RaftServerImpl server;
private final LeaderState leaderState;
protected final RaftLog raftLog;
@@ -63,8 +126,7 @@ public class LogAppender {
private final int snapshotChunkMaxSize;
protected final long halfMinTimeoutMs;
- private final LifeCycle lifeCycle;
- private final Daemon daemon = new Daemon(this::runAppender);
+ private final AppenderDaemon daemon;
public LogAppender(RaftServerImpl server, LeaderState leaderState,
FollowerInfo f) {
this.follower = f;
@@ -79,51 +141,24 @@ public class LogAppender {
final SizeInBytes bufferByteLimit =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final int bufferElementLimit =
RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit,
EntryWithData::getSerializedSize);
- this.lifeCycle = new LifeCycle(this);
+ this.daemon = new AppenderDaemon(this);
}
@Override
public String toString() {
- return getClass().getSimpleName() + "(" + server.getId() + " -> " +
- follower.getPeer().getId() + ")";
+ return getClass().getSimpleName() + "(" + follower.getName() + ")";
}
void startAppender() {
- // The life cycle state could be already closed due to server shutdown.
- if (lifeCycle.compareAndTransition(NEW, STARTING)) {
- daemon.start();
- }
+ daemon.start();
}
- private void runAppender() {
- lifeCycle.transition(RUNNING);
- try {
- runAppenderImpl();
- } catch (InterruptedException | InterruptedIOException e) {
- LOG.info(this + " was interrupted: " + e);
- } catch (IOException e) {
- LOG.error(this + " hit IOException while loading raft log", e);
- lifeCycle.transition(EXCEPTION);
- } catch (Throwable e) {
- LOG.error(this + " unexpected exception", e);
- lifeCycle.transition(EXCEPTION);
- } finally {
- if (!lifeCycle.compareAndTransition(CLOSING, CLOSED)) {
- lifeCycle.transitionIfNotEqual(EXCEPTION);
- }
- }
+ public boolean isAppenderRunning() {
+ return daemon.isRunning();
}
- protected boolean isAppenderRunning() {
- return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED, EXCEPTION);
- }
-
- public void stopAppender() {
- if (lifeCycle.compareAndTransition(NEW, CLOSED)) {
- return;
- }
- lifeCycle.transition(CLOSING);
- daemon.interrupt();
+ void stopAppender() {
+ daemon.stop();
}
public FollowerInfo getFollower() {
@@ -388,10 +423,8 @@ public class LogAppender {
if (shouldSendRequest()) {
SnapshotInfo snapshot = shouldInstallSnapshot();
if (snapshot != null) {
- LOG.info("{}:{} follower {}'s next index is {}," +
- " log's start index is {}, need to install snapshot",
- server.getId(), server.getGroupId(), follower.getPeer(),
follower.getNextIndex(),
- raftLog.getStartIndex());
+ LOG.info("{}:{} follower's next index is {}, log's start index is
{}, will install snapshot",
+ follower.getName(), server.getGroupId(),
follower.getNextIndex(), raftLog.getStartIndex());
final InstallSnapshotReplyProto r = installSnapshot(snapshot);
if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
index e3d16ff..13c9afd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -23,6 +23,7 @@ import org.apache.ratis.util.Preconditions;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
/**
* The configuration of the raft cluster.
@@ -60,10 +61,6 @@ public class RaftConfiguration {
return setConf(new PeerConfiguration(peers));
}
- public Builder setConf(RaftPeer[] peers) {
- return setConf(Arrays.asList(peers));
- }
-
Builder setConf(RaftConfiguration transitionalConf) {
Objects.requireNonNull(transitionalConf);
Preconditions.assertTrue(transitionalConf.isTransitional());
@@ -85,10 +82,6 @@ public class RaftConfiguration {
return setOldConf(new PeerConfiguration(oldPeers));
}
- public Builder setOldConf(RaftPeer[] oldPeers) {
- return setOldConf(Arrays.asList(oldPeers));
- }
-
Builder setOldConf(RaftConfiguration stableConf) {
Objects.requireNonNull(stableConf);
Preconditions.assertTrue(stableConf.isStable());
@@ -218,8 +211,8 @@ public class RaftConfiguration {
return logEntryIndex + ": " + conf + ", old=" + oldConf;
}
- boolean hasNoChange(RaftPeer[] newMembers) {
- if (!isStable() || conf.size() != newMembers.length) {
+ boolean hasNoChange(Collection<RaftPeer> newMembers) {
+ if (!isStable() || conf.size() != newMembers.size()) {
return false;
}
for (RaftPeer peer : newMembers) {
@@ -234,15 +227,9 @@ public class RaftConfiguration {
return logEntryIndex;
}
- static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers,
- RaftConfiguration old) {
- List<RaftPeer> peers = new ArrayList<>();
- for (RaftPeer p : newMembers) {
- if (!old.containsInConf(p.getId())) {
- peers.add(p);
- }
- }
- return peers;
+ /** @return the peers which are not contained in conf. */
+ Collection<RaftPeer> filterNotContainedInConf(List<RaftPeer> peers) {
+ return peers.stream().filter(p ->
!containsInConf(p.getId())).collect(Collectors.toList());
}
RaftPeer getRandomPeer(RaftPeerId exclusiveId) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2ccc5a9..f58ba9d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -450,8 +450,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
RaftConfiguration conf = getRaftConf();
Collection<RaftPeer> peers = conf.getPeers();
- return new NotLeaderException(getId(), conf.getPeer(leaderId),
- peers.toArray(new RaftPeer[peers.size()]));
+ return new NotLeaderException(getId(), conf.getPeer(leaderId), peers);
}
private LifeCycle.State assertLifeCycleState(LifeCycle.State... expected)
throws ServerNotReadyException {
@@ -648,7 +647,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
return reply;
}
- final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
+ final List<RaftPeer> peersInNewConf = request.getPeersInNewConf();
final PendingRequest pending;
synchronized (this) {
reply = checkLeaderState(request, null);
@@ -672,7 +671,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
// add new peers into the rpc service
- getServerRpc().addPeers(Arrays.asList(peersInNewConf));
+ getServerRpc().addPeers(peersInNewConf);
// add staging state into the leaderState
pending = leaderState.startSetConfiguration(request);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index c7c6355..ed5c728 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -125,10 +125,10 @@ public interface ServerProtoUtils {
Preconditions.assertTrue(entry.hasConfigurationEntry());
final RaftConfigurationProto proto = entry.getConfigurationEntry();
final RaftConfiguration.Builder b = RaftConfiguration.newBuilder()
- .setConf(ProtoUtils.toRaftPeerArray(proto.getPeersList()))
+ .setConf(ProtoUtils.toRaftPeers(proto.getPeersList()))
.setLogEntryIndex(entry.getIndex());
if (proto.getOldPeersCount() > 0) {
- b.setOldConf(ProtoUtils.toRaftPeerArray(proto.getOldPeersList()));
+ b.setOldConf(ProtoUtils.toRaftPeers(proto.getOldPeersList()));
}
return b.build();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 74c19a1..52481d6 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -689,8 +689,7 @@ public abstract class MiniRaftCluster implements Closeable {
public SetConfigurationRequest newSetConfigurationRequest(
ClientId clientId, RaftPeerId leaderId,
RaftPeer... peers) {
- return new SetConfigurationRequest(clientId, leaderId, getGroupId(),
- DEFAULT_CALLID, peers);
+ return new SetConfigurationRequest(clientId, leaderId, getGroupId(),
DEFAULT_CALLID, Arrays.asList(peers));
}
public void setConfiguration(RaftPeer... peers) throws IOException {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 9cf0d9b..35747a6 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -133,8 +133,8 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends
MiniRaftCluster>
10, ONE_SECOND, "assertNotLeaderException", LOG);
final Collection<RaftPeer> peers = cluster.getPeers();
- final RaftPeer[] peersFromReply =
reply.getNotLeaderException().getPeers();
- Assert.assertEquals(peers.size(), peersFromReply.length);
+ final Collection<RaftPeer> peersFromReply =
reply.getNotLeaderException().getPeers();
+ Assert.assertEquals(peers.size(), peersFromReply.size());
for (RaftPeer p : peersFromReply) {
Assert.assertTrue(peers.contains(p));
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 676713f..4c3f06d 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -47,12 +47,12 @@ public class RaftServerTestUtil {
RaftPeer[] peers, int numOfRemovedPeers, Collection<RaftPeerId>
deadPeers)
throws Exception {
final TimeDuration sleepTime = cluster.getTimeoutMax().apply(n -> n *
(numOfRemovedPeers + 2));
- JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, peers, deadPeers),
+ JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, Arrays.asList(peers),
deadPeers),
10, sleepTime, "waitAndCheckNewConf", LOG);
}
private static void waitAndCheckNewConf(MiniRaftCluster cluster,
- RaftPeer[] peers, Collection<RaftPeerId> deadPeers) {
- LOG.info("waitAndCheckNewConf: peers={}, deadPeers={}, {}",
Arrays.asList(peers), deadPeers, cluster.printServers());
+ Collection<RaftPeer> peers, Collection<RaftPeerId> deadPeers) {
+ LOG.info("waitAndCheckNewConf: peers={}, deadPeers={}, {}", peers,
deadPeers, cluster.printServers());
Assert.assertNotNull(cluster.getLeader());
int numIncluded = 0;
@@ -78,7 +78,7 @@ public class RaftServerTestUtil {
Assert.assertFalse(server.getRaftConf().containsInConf(server.getId()));
}
}
- Assert.assertEquals(peers.length, numIncluded + deadIncluded);
+ Assert.assertEquals(peers.size(), numIncluded + deadIncluded);
}
public static long getRetryCacheSize(RaftServerImpl server) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestPreconditions.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestPreconditions.java
new file mode 100644
index 0000000..7ea3cf7
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestPreconditions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.BaseTest;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class TestPreconditions extends BaseTest {
+ @Test(timeout = 1000)
+ public void testAssertUnique() {
+ final Set<Integer> empty = Collections.emptySet();
+ Preconditions.assertUnique(empty);
+ Preconditions.assertUnique(empty, empty);
+
+ final Set<Integer> one = Collections.singleton(1);
+ Preconditions.assertUnique(one);
+ Preconditions.assertUnique(empty, one);
+ testFailureCase("add [1] to [1]", () -> Preconditions.assertUnique(one,
one), IllegalStateException.class);
+
+ final List<Integer> three = Arrays.asList(1, 2, 3);
+ testFailureCase("add [1, 2, 3] to [1]", () ->
Preconditions.assertUnique(three, one), IllegalStateException.class);
+ testFailureCase("add [1] to [1, 2, 3]", () ->
Preconditions.assertUnique(one, three), IllegalStateException.class);
+
+ final List<Integer> duplicated = Arrays.asList(3, 2, 3);
+ testFailureCase("check [3, 2, 3]", () ->
Preconditions.assertUnique(duplicated), IllegalStateException.class);
+ testFailureCase("add [1] to [3, 2, 3]", () ->
Preconditions.assertUnique(duplicated, one),
+ IllegalStateException.class);
+ testFailureCase("add [3, 2, 3] to [1]", () ->
Preconditions.assertUnique(one, duplicated),
+ IllegalStateException.class);
+
+ Preconditions.assertUnique(three, Arrays.asList(4, 5, 6));
+ }
+}