Change RaftServerRpc to extend RaftServerProtocol and Closeable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/673a2827 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/673a2827 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/673a2827 Branch: refs/heads/master Commit: 673a28278346fb641f7c1e81c20bc37168be33f0 Parents: 56e9b71 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Mon Jan 2 23:59:14 2017 +0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Mon Jan 2 23:59:14 2017 +0800 ---------------------------------------------------------------------- .../org/apache/raft/grpc/RaftGRpcService.java | 10 ++++----- .../raft/hadooprpc/server/HadoopRpcService.java | 8 +++---- .../raft/netty/server/NettyRpcService.java | 8 +++---- .../org/apache/raft/server/RaftServerRpc.java | 23 ++++++++------------ .../apache/raft/server/impl/LeaderElection.java | 2 +- .../apache/raft/server/impl/LogAppender.java | 4 ++-- .../apache/raft/server/impl/RaftServerImpl.java | 11 ++++------ .../apache/raft/server/impl/ServerState.java | 2 +- .../server/simulation/SimulatedServerRpc.java | 8 +++---- 9 files changed, 34 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java index f3d894a..d0c98c3 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java @@ -90,14 +90,14 @@ public class RaftGRpcService implements RaftServerRpc { @Override public void run() { System.err.println("*** shutting down gRPC server since JVM is shutting down"); - RaftGRpcService.this.shutdown(); + RaftGRpcService.this.close(); System.err.println("*** server shut down"); } }); } @Override - public void shutdown() { + public void close() { if (server != null) { server.shutdown(); } @@ -110,21 +110,21 @@ public class RaftGRpcService implements RaftServerRpc { } @Override - public AppendEntriesReplyProto sendAppendEntries( + public AppendEntriesReplyProto appendEntries( AppendEntriesRequestProto request) throws IOException { throw new UnsupportedOperationException( "Blocking AppendEntries call is not supported"); } @Override - public InstallSnapshotReplyProto sendInstallSnapshot( + public InstallSnapshotReplyProto installSnapshot( InstallSnapshotRequestProto request) throws IOException { throw new UnsupportedOperationException( "Blocking InstallSnapshot call is not supported"); } @Override - public RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request) + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException { CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, selfId, null, request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java index 3330d78..b73deca 100644 --- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java +++ b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java @@ -122,12 +122,12 @@ public class HadoopRpcService implements RaftServerRpc { } @Override - public void shutdown() { + public void close() { ipcServer.stop(); } @Override - public AppendEntriesReplyProto sendAppendEntries( + public AppendEntriesReplyProto appendEntries( AppendEntriesRequestProto request) throws IOException { Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); @@ -142,7 +142,7 @@ public class HadoopRpcService implements RaftServerRpc { } @Override - public InstallSnapshotReplyProto sendInstallSnapshot( + public InstallSnapshotReplyProto installSnapshot( InstallSnapshotRequestProto request) throws IOException { Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); @@ -157,7 +157,7 @@ public class HadoopRpcService implements RaftServerRpc { } @Override - public RequestVoteReplyProto sendRequestVote( + public RequestVoteReplyProto requestVote( RequestVoteRequestProto request) throws IOException { Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java index 19f5979..08e379a 100644 --- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java +++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java @@ -112,7 +112,7 @@ public final class NettyRpcService implements RaftServerRpc { } @Override - public void shutdown() { + public void close() { lifeCycle.checkStateAndClose(() -> { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); @@ -199,7 +199,7 @@ public final class NettyRpcService implements RaftServerRpc { } @Override - public RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request) throws IOException { + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException { Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); @@ -211,7 +211,7 @@ public final class NettyRpcService implements RaftServerRpc { } @Override - public AppendEntriesReplyProto sendAppendEntries(AppendEntriesRequestProto request) throws IOException { + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException { Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); @@ -223,7 +223,7 @@ public final class NettyRpcService implements RaftServerRpc { } @Override - public InstallSnapshotReplyProto sendInstallSnapshot(InstallSnapshotRequestProto request) throws IOException { + public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException { Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java index de81ec2..6526bea 100644 --- a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java +++ b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java @@ -18,27 +18,22 @@ package org.apache.raft.server; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.shaded.proto.RaftProtos.*; +import org.apache.raft.server.protocol.RaftServerProtocol; -import java.io.IOException; +import java.io.Closeable; import java.net.InetSocketAddress; -public interface RaftServerRpc { +/** + * An server-side interface for supporting different RPC implementations + * such as Netty, gRPC and Hadoop. + */ +public interface RaftServerRpc extends RaftServerProtocol, Closeable { + /** Start the RPC service. */ void start(); - void shutdown(); - + /** @return the address where this RPC server is listening to. */ InetSocketAddress getInetSocketAddress(); - AppendEntriesReplyProto sendAppendEntries( - AppendEntriesRequestProto request) throws IOException; - - InstallSnapshotReplyProto sendInstallSnapshot( - InstallSnapshotRequestProto request) throws IOException; - - RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request) - throws IOException; - /** add information of the given peers */ void addPeers(Iterable<RaftPeer> peers); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java index 8552029..39bdb13 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java @@ -187,7 +187,7 @@ class LeaderElection extends Daemon { final RequestVoteRequestProto r = server.createRequestVoteRequest( peer.getId(), electionTerm, lastEntry); service.submit( - () -> server.getServerRpc().sendRequestVote(r)); + () -> server.getServerRpc().requestVote(r)); submitted++; } return submitted; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java index 3b18f13..cf613ca 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java @@ -198,7 +198,7 @@ public class LogAppender extends Daemon { follower.updateLastRpcSendTime(); final AppendEntriesReplyProto r = server.getServerRpc() - .sendAppendEntries(request); + .appendEntries(request); follower.updateLastRpcResponseTime(); return r; @@ -328,7 +328,7 @@ public class LogAppender extends Daemon { for (InstallSnapshotRequestProto request : new SnapshotRequestIter(snapshot, requestId)) { follower.updateLastRpcSendTime(); - reply = server.getServerRpc().sendInstallSnapshot(request); + reply = server.getServerRpc().installSnapshot(request); follower.updateLastRpcResponseTime(); if (!reply.getServerReply().getSuccess()) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java index 131e002..3026afa 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java @@ -132,12 +132,13 @@ public class RaftServerImpl implements RaftServer { this.state.setInitialConf(conf); } + @Override public void setServerRpc(RaftServerRpc serverRpc) { this.serverRpc = serverRpc; // add peers into rpc service RaftConfiguration conf = getRaftConf(); if (conf != null) { - addPeersToRPC(conf.getPeers()); + serverRpc.addPeers(conf.getPeers()); } } @@ -201,7 +202,7 @@ public class RaftServerImpl implements RaftServer { shutdownElectionDaemon(); shutdownLeaderState(); - serverRpc.shutdown(); + serverRpc.close(); state.close(); } catch (Exception ignored) { LOG.warn("Failed to kill " + state.getSelfId(), ignored); @@ -418,7 +419,7 @@ public class RaftServerImpl implements RaftServer { } // add new peers into the rpc service - addPeersToRPC(Arrays.asList(peersInNewConf)); + getServerRpc().addPeers(Arrays.asList(peersInNewConf)); // add staging state into the leaderState pending = leaderState.startSetConfiguration(request); } @@ -726,10 +727,6 @@ public class RaftServerImpl implements RaftServer { } } - public void addPeersToRPC(Iterable<RaftPeer> peers) { - serverRpc.addPeers(peers); - } - synchronized void replyPendingRequest(long logIndex, CompletableFuture<Message> message) { if (isLeader() && leaderState != null) { // is leader and is running http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java index c91968c..8611101 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java @@ -285,7 +285,7 @@ public class ServerState implements Closeable { final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration( entry.getIndex(), entry.getConfigurationEntry()); configurationManager.addConfiguration(entry.getIndex(), conf); - server.addPeersToRPC(conf.getPeers()); + server.getServerRpc().addPeers(conf.getPeers()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java index cc3fb35..8a7e752 100644 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java +++ b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java @@ -69,7 +69,7 @@ class SimulatedServerRpc implements RaftServerRpc { } @Override - public void shutdown() { + public void close() { try { interruptAndJoin(); executor.shutdown(); @@ -86,7 +86,7 @@ class SimulatedServerRpc implements RaftServerRpc { } @Override - public AppendEntriesReplyProto sendAppendEntries(AppendEntriesRequestProto request) + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException { RaftServerReply reply = serverHandler.getRpc() .sendRequest(new RaftServerRequest(request)); @@ -94,7 +94,7 @@ class SimulatedServerRpc implements RaftServerRpc { } @Override - public InstallSnapshotReplyProto sendInstallSnapshot(InstallSnapshotRequestProto request) + public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException { RaftServerReply reply = serverHandler.getRpc() .sendRequest(new RaftServerRequest(request)); @@ -102,7 +102,7 @@ class SimulatedServerRpc implements RaftServerRpc { } @Override - public RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request) + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException { RaftServerReply reply = serverHandler.getRpc() .sendRequest(new RaftServerRequest(request));