Repository: incubator-ratis Updated Branches: refs/heads/master 46116d412 -> 6adf89d48
RATIS-15. Add call ID to identify a client request and its retry. Contributed by Jing Zhao Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/6adf89d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/6adf89d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/6adf89d4 Branch: refs/heads/master Commit: 6adf89d481cc424716b12aa120e89b2a0fe921fb Parents: 46116d4 Author: Mingliang Liu <[email protected]> Authored: Fri Mar 3 18:58:11 2017 -0800 Committer: Mingliang Liu <[email protected]> Committed: Fri Mar 3 18:58:11 2017 -0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 1 - .../ratis/client/impl/ClientProtoUtils.java | 24 ++++++++++---------- .../ratis/client/impl/RaftClientImpl.java | 13 +++++++++-- .../apache/ratis/protocol/RaftClientReply.java | 16 ++++++------- .../ratis/protocol/RaftClientRequest.java | 16 ++++++------- .../ratis/protocol/SetConfigurationRequest.java | 4 ++-- .../java/org/apache/ratis/util/ProtoUtils.java | 4 ++-- .../ratis/grpc/client/AppendStreamer.java | 6 ++--- .../grpc/client/RaftClientProtocolService.java | 22 +++++++++--------- .../ratis/grpc/client/RaftOutputStream.java | 6 +++-- .../org/apache/ratis/netty/NettyRpcProxy.java | 14 ++++++------ .../ratis/netty/server/NettyRpcService.java | 2 +- ratis-proto-shaded/src/main/proto/Raft.proto | 4 ++-- .../ratis/server/impl/RaftServerConstants.java | 4 +--- .../ratis/server/impl/RaftServerImpl.java | 2 +- .../ratis/server/impl/ServerProtoUtils.java | 15 ++++++------ .../ratis/RaftNotLeaderExceptionBaseTest.java | 6 ++--- .../impl/RaftReconfigurationBaseTest.java | 14 ++++++------ .../statemachine/RaftSnapshotBaseTest.java | 4 ++-- 19 files changed, 92 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 8fbffd3..bf85386 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -31,7 +31,6 @@ import java.util.Objects; /** A client who sends requests to a raft service. */ public interface RaftClient extends Closeable { Logger LOG = LoggerFactory.getLogger(RaftClient.class); - long DEFAULT_SEQNUM = 0; /** @return the id of this client. */ ClientId getId(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index b0e1d41..218d761 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -26,20 +26,20 @@ import java.util.Arrays; public class ClientProtoUtils { public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder( - byte[] requestorId, byte[] replyId, long seqNum, boolean success) { + byte[] requestorId, byte[] replyId, long callId, boolean success) { return RaftRpcReplyProto.newBuilder() .setRequestorId(ProtoUtils.toByteString(requestorId)) .setReplyId(ProtoUtils.toByteString(replyId)) - .setSeqNum(seqNum) + .setCallId(callId) .setSuccess(success); } public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( - byte[] requesterId, byte[] replyId, long seqNum) { + byte[] requesterId, byte[] replyId, long callId) { return RaftRpcRequestProto.newBuilder() .setRequestorId(ProtoUtils.toByteString(requesterId)) .setReplyId(ProtoUtils.toByteString(replyId)) - .setSeqNum(seqNum); + .setCallId(callId); } public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) { @@ -48,7 +48,7 @@ public class ClientProtoUtils { RaftPeerId serverId = new RaftPeerId( p.getRpcRequest().getReplyId()); return new RaftClientRequest(clientId, serverId, - p.getRpcRequest().getSeqNum(), + p.getRpcRequest().getCallId(), toMessage(p.getMessage()), p.getReadOnly()); } @@ -56,18 +56,18 @@ public class ClientProtoUtils { RaftClientRequest request) { return RaftClientRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder(request.getClientId().toBytes(), - request.getServerId().toBytes(), request.getSeqNum())) + request.getServerId().toBytes(), request.getCallId())) .setMessage(toClientMessageEntryProto(request.getMessage())) .setReadOnly(request.isReadOnly()) .build(); } public static RaftClientRequestProto genRaftClientRequestProto( - ClientId clientId, RaftPeerId serverId, long seqNum, ByteString content, + ClientId clientId, RaftPeerId serverId, long callId, ByteString content, boolean readOnly) { return RaftClientRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(), - serverId.toBytes(), seqNum)) + serverId.toBytes(), callId)) .setMessage(ClientMessageEntryProto.newBuilder().setContent(content)) .setReadOnly(readOnly) .build(); @@ -78,7 +78,7 @@ public class ClientProtoUtils { final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder(); if (reply != null) { b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toBytes(), - reply.getServerId().toBytes(), reply.getSeqNum(), reply.isSuccess())); + reply.getServerId().toBytes(), reply.getCallId(), reply.isSuccess())); if (reply.getMessage() != null) { b.setMessage(toClientMessageEntryProto(reply.getMessage())); } @@ -110,7 +110,7 @@ public class ClientProtoUtils { } return new RaftClientReply(new ClientId(rp.getRequestorId().toByteArray()), new RaftPeerId(rp.getReplyId()), - rp.getSeqNum(), rp.getSuccess(), toMessage(replyProto.getMessage()), e); + rp.getCallId(), rp.getSuccess(), toMessage(replyProto.getMessage()), e); } public static Message toMessage(final ClientMessageEntryProto p) { @@ -129,7 +129,7 @@ public class ClientProtoUtils { return new SetConfigurationRequest( new ClientId(m.getRequestorId().toByteArray()), new RaftPeerId(m.getReplyId()), - p.getRpcRequest().getSeqNum(), peers); + p.getRpcRequest().getCallId(), peers); } public static SetConfigurationRequestProto toSetConfigurationRequestProto( @@ -138,7 +138,7 @@ public class ClientProtoUtils { .setRpcRequest(toRaftRpcRequestProtoBuilder( request.getClientId().toBytes(), request.getServerId().toBytes(), - request.getSeqNum())) + request.getCallId())) .addAllPeers(ProtoUtils.toRaftPeerProtos( Arrays.asList(request.getPeersInNewConf()))) .build(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 3a6fd58..be22305 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -26,10 +26,17 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; /** A client who sends requests to a raft service. */ final class RaftClientImpl implements RaftClient { + private static final AtomicLong callIdCounter = new AtomicLong(); + + private static long nextCallId() { + return callIdCounter.getAndIncrement() & Long.MAX_VALUE; + } + private final ClientId clientId; private final RaftClientRpc clientRpc; private final Collection<RaftPeer> peers; @@ -65,15 +72,17 @@ final class RaftClientImpl implements RaftClient { } private RaftClientReply send(Message message, boolean readOnly) throws IOException { + final long callId = nextCallId(); return sendRequestWithRetry(() -> new RaftClientRequest( - clientId, leaderId, DEFAULT_SEQNUM, message, readOnly)); + clientId, leaderId, callId, message, readOnly)); } @Override public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf) throws IOException { + final long callId = nextCallId(); return sendRequestWithRetry(() -> new SetConfigurationRequest( - clientId, leaderId, DEFAULT_SEQNUM, peersInNewConf)); + clientId, leaderId, callId, peersInNewConf)); } private RaftClientReply sendRequestWithRetry( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 459e0f4..4dd2943 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -22,29 +22,29 @@ package org.apache.ratis.protocol; */ public class RaftClientReply extends RaftClientMessage { private final boolean success; - private final long seqNum; + private final long callId; /** non-null if the server is not leader */ private final NotLeaderException notLeaderException; private final Message message; - public RaftClientReply(ClientId clientId, RaftPeerId serverId, long seqNum, + public RaftClientReply(ClientId clientId, RaftPeerId serverId, long callId, boolean success, Message message, NotLeaderException notLeaderException) { super(clientId, serverId); this.success = success; - this.seqNum = seqNum; + this.callId = callId; this.message = message; this.notLeaderException = notLeaderException; } public RaftClientReply(RaftClientRequest request, NotLeaderException notLeaderException) { - this(request.getClientId(), request.getServerId(), request.getSeqNum(), + this(request.getClientId(), request.getServerId(), request.getCallId(), false, null, notLeaderException); } public RaftClientReply(RaftClientRequest request, Message message) { - this(request.getClientId(), request.getServerId(), request.getSeqNum(), + this(request.getClientId(), request.getServerId(), request.getCallId(), true, message, null); } @@ -53,13 +53,13 @@ public class RaftClientReply extends RaftClientMessage { return false; } - public long getSeqNum() { - return seqNum; + public long getCallId() { + return callId; } @Override public String toString() { - return super.toString() + ", seqNum: " + getSeqNum() + return super.toString() + ", callId: " + getCallId() + ", success: " + isSuccess(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 9b649c9..898c166 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -21,19 +21,19 @@ package org.apache.ratis.protocol; * Request from client to server */ public class RaftClientRequest extends RaftClientMessage { - private final long seqNum; + private final long callId; private final Message message; private final boolean readOnly; public RaftClientRequest(ClientId clientId, RaftPeerId serverId, - long seqNum, Message message) { - this(clientId, serverId, seqNum, message, false); + long callId, Message message) { + this(clientId, serverId, callId, message, false); } public RaftClientRequest(ClientId clientId, RaftPeerId serverId, - long seqNum, Message message, boolean readOnly) { + long callId, Message message, boolean readOnly) { super(clientId, serverId); - this.seqNum = seqNum; + this.callId = callId; this.message = message; this.readOnly = readOnly; } @@ -43,8 +43,8 @@ public class RaftClientRequest extends RaftClientMessage { return true; } - public long getSeqNum() { - return seqNum; + public long getCallId() { + return callId; } public Message getMessage() { @@ -57,7 +57,7 @@ public class RaftClientRequest extends RaftClientMessage { @Override public String toString() { - return super.toString() + ", seqNum: " + seqNum + ", " + return super.toString() + ", callId: " + callId + ", " + (isReadOnly()? "RO": "RW"); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java index 6bc34f4..77d545c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java @@ -23,8 +23,8 @@ public class SetConfigurationRequest extends RaftClientRequest { private final RaftPeer[] peers; public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId, - long seqNum, RaftPeer[] peers) { - super(clientId, serverId, seqNum, null); + long callId, RaftPeer[] peers) { + super(clientId, serverId, callId, null); this.peers = peers; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 2613342..ea74d09 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -132,12 +132,12 @@ public class ProtoUtils { public static String toString(RaftRpcRequestProto proto) { return proto.getRequestorId() + "->" + proto.getReplyId() - + "#" + proto.getSeqNum(); + + "#" + proto.getCallId(); } public static String toString(RaftRpcReplyProto proto) { return proto.getRequestorId() + "<-" + proto.getReplyId() - + "#" + proto.getSeqNum() + ":" + + "#" + proto.getCallId() + ":" + (proto.getSuccess()? "OK": "FAIL"); } public static String toString(RequestVoteReplyProto proto) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index 4f96c06..6d7b207 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -272,8 +272,8 @@ public class AppendStreamer implements Closeable { RaftClientRequestProto pending = Preconditions.checkNotNull( ackQueue.peek()); if (reply.getRpcReply().getSuccess()) { - Preconditions.checkState(pending.getRpcRequest().getSeqNum() == - reply.getRpcReply().getSeqNum()); + Preconditions.checkState(pending.getRpcRequest().getCallId() == + reply.getRpcReply().getCallId()); ackQueue.poll(); LOG.trace("{} received success ack for request {}", this, pending.getRpcRequest()); @@ -375,7 +375,7 @@ public class AppendStreamer implements Closeable { .setMessage(oldRequest.getMessage()) .setReadOnly(oldRequest.getReadOnly()) .setRpcRequest(toRaftRpcRequestProtoBuilder( - clientId.toBytes(), newLeader.toBytes(), r.getSeqNum())) + clientId.toBytes(), newLeader.toBytes(), r.getCallId())) .build(); dataQueue.offerFirst(newRequest); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java index 99d8778..d550963 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java @@ -39,11 +39,11 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class); private static class PendingAppend implements Comparable<PendingAppend> { - private final long seqNum; + private final long callId; private volatile RaftClientReply reply; - PendingAppend(long seqNum) { - this.seqNum = seqNum; + PendingAppend(long callId) { + this.callId = callId; } boolean isReady() { @@ -56,12 +56,12 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase @Override public int compareTo(PendingAppend p) { - return seqNum == p.seqNum ? 0 : (seqNum < p.seqNum ? -1 : 1); + return callId == p.callId ? 0 : (callId < p.callId ? -1 : 1); } @Override public String toString() { - return seqNum + ", reply:" + (reply == null ? "null" : reply.toString()); + return callId + ", reply:" + (reply == null ? "null" : reply.toString()); } } private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE); @@ -111,7 +111,7 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase @Override public void onNext(RaftClientRequestProto request) { try { - PendingAppend p = new PendingAppend(request.getRpcRequest().getSeqNum()); + PendingAppend p = new PendingAppend(request.getRpcRequest().getCallId()); synchronized (pendingList) { pendingList.add(p); } @@ -125,17 +125,17 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase // exception from the state machine. responseObserver.onError(RaftGrpcUtil.wrapException(exception)); } else { - final long replySeq = reply.getSeqNum(); + final long replySeq = reply.getCallId(); synchronized (pendingList) { Preconditions.checkState(!pendingList.isEmpty(), - "PendingList is empty when handling onNext for seqNum %s", + "PendingList is empty when handling onNext for callId %s", replySeq); - final long headSeqNum = pendingList.get(0).seqNum; - // we assume the seqNum is consecutive for a stream RPC call + final long headSeqNum = pendingList.get(0).callId; + // we assume the callId is consecutive for a stream RPC call final PendingAppend pendingForReply = pendingList.get( (int) (replySeq - headSeqNum)); Preconditions.checkState(pendingForReply != null && - pendingForReply.seqNum == replySeq, + pendingForReply.callId == replySeq, "pending for reply is: %s, the pending list: %s", pendingForReply, pendingList); pendingForReply.setReply(reply); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java index 8f0e183..73e56b8 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java @@ -23,6 +23,7 @@ import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_ import java.io.IOException; import java.io.OutputStream; import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; @@ -34,7 +35,7 @@ public class RaftOutputStream extends OutputStream { /** internal buffer */ private final byte buf[]; private int count; - private long seqNum = 0; + private final AtomicLong seqNum = new AtomicLong(); private final ClientId clientId; private final AppendStreamer streamer; @@ -82,7 +83,8 @@ public class RaftOutputStream extends OutputStream { private void flushToStreamer() throws IOException { if (count > 0) { - streamer.write(ProtoUtils.toByteString(buf, 0, count), seqNum++); + streamer.write(ProtoUtils.toByteString(buf, 0, count), + seqNum.getAndIncrement()); count = 0; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java index 9b8553b..5b7efc8 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java @@ -69,18 +69,18 @@ public class NettyRpcProxy implements Closeable { } } - public static long getSeqNum(RaftNettyServerReplyProto proto) { + public static long getCallId(RaftNettyServerReplyProto proto) { switch (proto.getRaftNettyServerReplyCase()) { case REQUESTVOTEREPLY: - return proto.getRequestVoteReply().getServerReply().getSeqNum(); + return proto.getRequestVoteReply().getServerReply().getCallId(); case APPENDENTRIESREPLY: - return proto.getAppendEntriesReply().getServerReply().getSeqNum(); + return proto.getAppendEntriesReply().getServerReply().getCallId(); case INSTALLSNAPSHOTREPLY: - return proto.getInstallSnapshotReply().getServerReply().getSeqNum(); + return proto.getInstallSnapshotReply().getServerReply().getCallId(); case RAFTCLIENTREPLY: - return proto.getRaftClientReply().getRpcReply().getSeqNum(); + return proto.getRaftClientReply().getRpcReply().getCallId(); case EXCEPTIONREPLY: - return proto.getExceptionReply().getRpcReply().getSeqNum(); + return proto.getExceptionReply().getRpcReply().getCallId(); case RAFTNETTYSERVERREPLY_NOT_SET: throw new IllegalArgumentException("Reply case not set in proto: " + proto.getRaftNettyServerReplyCase()); @@ -104,7 +104,7 @@ public class NettyRpcProxy implements Closeable { RaftNettyServerReplyProto proto) { final CompletableFuture<RaftNettyServerReplyProto> future = pollReply(); if (future == null) { - throw new IllegalStateException("Request #" + getSeqNum(proto) + throw new IllegalStateException("Request #" + getCallId(proto) + " not found"); } if (proto.getRaftNettyServerReplyCase() == EXCEPTIONREPLY) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index a1486b6..2d927db 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -218,7 +218,7 @@ public final class NettyRpcService implements RaftServerRpc { final RaftRpcReplyProto.Builder rpcReply = RaftRpcReplyProto.newBuilder() .setRequestorId(request.getRequestorId()) .setReplyId(request.getReplyId()) - .setSeqNum(request.getSeqNum()) + .setCallId(request.getCallId()) .setSuccess(false); final RaftNettyExceptionReplyProto.Builder ioe = RaftNettyExceptionReplyProto.newBuilder() .setRpcReply(rpcReply) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 14901f6..8c334dd 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -65,13 +65,13 @@ message TermIndexProto { message RaftRpcRequestProto { bytes requestorId = 1; bytes replyId = 2; - uint64 seqNum = 3; + uint64 callId = 3; } message RaftRpcReplyProto { bytes requestorId = 1; bytes replyId = 2; - uint64 seqNum = 3; + uint64 callId = 3; bool success = 4; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java index caf9c4d..b4db46e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java @@ -17,12 +17,10 @@ */ package org.apache.ratis.server.impl; -import org.apache.ratis.client.RaftClient; - public interface RaftServerConstants { long INVALID_LOG_INDEX = -1; byte LOG_TERMINATE_BYTE = 0; - long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM; + long DEFAULT_CALLID = 0; enum StartupOption { FORMAT("format"), http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index b9f063e..3c7c0e1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -381,7 +381,7 @@ public class RaftServerImpl implements RaftServer { final long entryIndex; try { entryIndex = state.applyLog(entry, request.getClientId(), - request.getSeqNum()); + request.getCallId()); } catch (IOException e) { throw new RaftException(e); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 846fbe2..945be8d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -17,7 +17,7 @@ */ package org.apache.ratis.server.impl; -import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; import java.util.Arrays; @@ -112,7 +112,7 @@ public class ServerProtoUtils { boolean shouldShutdown) { final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder(); b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder( - requestorId.toBytes(), replyId.toBytes(), DEFAULT_SEQNUM, success)) + requestorId.toBytes(), replyId.toBytes(), DEFAULT_CALLID, success)) .setTerm(term) .setShouldShutdown(shouldShutdown); return b.build(); @@ -121,8 +121,7 @@ public class ServerProtoUtils { public static RequestVoteRequestProto toRequestVoteRequestProto( RaftPeerId requestorId, RaftPeerId replyId, long term, TermIndex lastEntry) { RaftProtos.RaftRpcRequestProto.Builder rpb = ClientProtoUtils - .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(), - DEFAULT_SEQNUM); + .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(), DEFAULT_CALLID); final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder() .setServerRequest(rpb) .setCandidateTerm(term); @@ -136,7 +135,7 @@ public class ServerProtoUtils { RaftPeerId requestorId, RaftPeerId replyId, long term, int requestIndex, InstallSnapshotResult result) { final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS); + replyId.toBytes(), DEFAULT_CALLID, result == InstallSnapshotResult.SUCCESS); final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto .newBuilder().setServerReply(rb).setTerm(term).setResult(result) .setRequestIndex(requestIndex); @@ -150,7 +149,7 @@ public class ServerProtoUtils { return InstallSnapshotRequestProto.newBuilder() .setServerRequest( ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), DEFAULT_SEQNUM)) + replyId.toBytes(), DEFAULT_CALLID)) .setRequestId(requestId) .setRequestIndex(requestIndex) // .setRaftConfiguration() TODO: save and pass RaftConfiguration @@ -165,7 +164,7 @@ public class ServerProtoUtils { RaftPeerId requestorId, RaftPeerId replyId, long term, long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) { RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), DEFAULT_SEQNUM, appendResult == SUCCESS); + replyId.toBytes(), DEFAULT_CALLID, appendResult == SUCCESS); final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder(); b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex) .setResult(appendResult); @@ -180,7 +179,7 @@ public class ServerProtoUtils { .newBuilder() .setServerRequest( ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(), - replyId.toBytes(), DEFAULT_SEQNUM)) + replyId.toBytes(), DEFAULT_CALLID)) .setLeaderTerm(leaderTerm) .setLeaderCommit(leaderCommit) .setInitializing(initializing); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java index a5d1127..54cfa4d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java @@ -40,7 +40,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; public abstract class RaftNotLeaderExceptionBaseTest { static { @@ -92,7 +92,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { for (int i = 0; reply == null && i < 10; i++) { try { reply = rpc.sendRequest( - new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_SEQNUM, + new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_CALLID, new SimpleMessage("m2"))); } catch (IOException ignored) { Thread.sleep(1000); @@ -139,7 +139,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { for (int i = 0; reply == null && i < 10; i++) { try { reply = rpc.sendRequest( - new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_SEQNUM, + new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_CALLID, new SimpleMessage("m1"))); } catch (IOException ignored) { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index e3db854..90cbef5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Arrays.asList; import static org.apache.ratis.MiniRaftCluster.leaderPlaceHolderDelay; import static org.apache.ratis.MiniRaftCluster.logSyncDelay; -import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; @@ -93,7 +93,7 @@ public abstract class RaftReconfigurationBaseTest { // trigger setConfiguration SetConfigurationRequest request = new SetConfigurationRequest(clientId, - cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); + cluster.getLeader().getId(), DEFAULT_CALLID, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -121,7 +121,7 @@ public abstract class RaftReconfigurationBaseTest { // trigger setConfiguration SetConfigurationRequest request = new SetConfigurationRequest(clientId, - cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); + cluster.getLeader().getId(), DEFAULT_CALLID, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -159,7 +159,7 @@ public abstract class RaftReconfigurationBaseTest { // trigger setConfiguration SetConfigurationRequest request = new SetConfigurationRequest(clientId, - cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); + cluster.getLeader().getId(), DEFAULT_CALLID, allPeers); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request); @@ -255,7 +255,7 @@ public abstract class RaftReconfigurationBaseTest { final RaftClientRpc sender = client.getClientRpc(); final SetConfigurationRequest request = new SetConfigurationRequest( - client.getId(), leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf); + client.getId(), leaderId, DEFAULT_CALLID, c1.allPeersInNewConf); try { sender.sendRequest(request); Assert.fail("did not get expected exception"); @@ -472,7 +472,7 @@ public abstract class RaftReconfigurationBaseTest { LOG.info("client2 starts to change conf"); final RaftClientRpc sender2 = client2.getClientRpc(); sender2.sendRequest(new SetConfigurationRequest( - client2.getId(), leaderId, DEFAULT_SEQNUM, peersInRequest2)); + client2.getId(), leaderId, DEFAULT_CALLID, peersInRequest2)); } catch (ReconfigurationInProgressException e) { caughtException.set(true); } catch (Exception e) { @@ -536,7 +536,7 @@ public abstract class RaftReconfigurationBaseTest { LOG.info("client starts to change conf"); final RaftClientRpc sender = client.getClientRpc(); RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest( - client.getId(), leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf)); + client.getId(), leaderId, DEFAULT_CALLID, change.allPeersInNewConf)); if (reply.isNotLeader()) { gotNotLeader.set(true); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index cb4412a..217d5ae 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -19,7 +19,7 @@ package org.apache.ratis.statemachine; import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY; import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY; -import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; import java.io.File; import java.util.List; @@ -200,7 +200,7 @@ public abstract class RaftSnapshotBaseTest { new String[]{"s3", "s4"}, true); // trigger setConfiguration SetConfigurationRequest request = new SetConfigurationRequest(ClientId.createId(), - cluster.getLeader().getId(), DEFAULT_SEQNUM, change.allPeersInNewConf); + cluster.getLeader().getId(), DEFAULT_CALLID, change.allPeersInNewConf); LOG.info("Start changing the configuration: {}", request); cluster.getLeader().setConfiguration(request);
