Change RaftServerRpc to extend RaftServerProtocol and Closeable.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/673a2827
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/673a2827
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/673a2827

Branch: refs/heads/master
Commit: 673a28278346fb641f7c1e81c20bc37168be33f0
Parents: 56e9b71
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Mon Jan 2 23:59:14 2017 +0800
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Mon Jan 2 23:59:14 2017 +0800

----------------------------------------------------------------------
 .../org/apache/raft/grpc/RaftGRpcService.java   | 10 ++++-----
 .../raft/hadooprpc/server/HadoopRpcService.java |  8 +++----
 .../raft/netty/server/NettyRpcService.java      |  8 +++----
 .../org/apache/raft/server/RaftServerRpc.java   | 23 ++++++++------------
 .../apache/raft/server/impl/LeaderElection.java |  2 +-
 .../apache/raft/server/impl/LogAppender.java    |  4 ++--
 .../apache/raft/server/impl/RaftServerImpl.java | 11 ++++------
 .../apache/raft/server/impl/ServerState.java    |  2 +-
 .../server/simulation/SimulatedServerRpc.java   |  8 +++----
 9 files changed, 34 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
index f3d894a..d0c98c3 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
@@ -90,14 +90,14 @@ public class RaftGRpcService implements RaftServerRpc {
       @Override
       public void run() {
         System.err.println("*** shutting down gRPC server since JVM is 
shutting down");
-        RaftGRpcService.this.shutdown();
+        RaftGRpcService.this.close();
         System.err.println("*** server shut down");
       }
     });
   }
 
   @Override
-  public void shutdown() {
+  public void close() {
     if (server != null) {
       server.shutdown();
     }
@@ -110,21 +110,21 @@ public class RaftGRpcService implements RaftServerRpc {
   }
 
   @Override
-  public AppendEntriesReplyProto sendAppendEntries(
+  public AppendEntriesReplyProto appendEntries(
       AppendEntriesRequestProto request) throws IOException {
     throw new UnsupportedOperationException(
         "Blocking AppendEntries call is not supported");
   }
 
   @Override
-  public InstallSnapshotReplyProto sendInstallSnapshot(
+  public InstallSnapshotReplyProto installSnapshot(
       InstallSnapshotRequestProto request) throws IOException {
     throw new UnsupportedOperationException(
         "Blocking InstallSnapshot call is not supported");
   }
 
   @Override
-  public RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request)
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
       throws IOException {
     CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, selfId,
         null, request);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
 
b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
index 3330d78..b73deca 100644
--- 
a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
+++ 
b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
@@ -122,12 +122,12 @@ public class HadoopRpcService implements RaftServerRpc {
   }
 
   @Override
-  public void shutdown() {
+  public void close() {
     ipcServer.stop();
   }
 
   @Override
-  public AppendEntriesReplyProto sendAppendEntries(
+  public AppendEntriesReplyProto appendEntries(
       AppendEntriesRequestProto request) throws IOException {
     
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
     CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
@@ -142,7 +142,7 @@ public class HadoopRpcService implements RaftServerRpc {
   }
 
   @Override
-  public InstallSnapshotReplyProto sendInstallSnapshot(
+  public InstallSnapshotReplyProto installSnapshot(
       InstallSnapshotRequestProto request) throws IOException {
     
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
     CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
@@ -157,7 +157,7 @@ public class HadoopRpcService implements RaftServerRpc {
   }
 
   @Override
-  public RequestVoteReplyProto sendRequestVote(
+  public RequestVoteReplyProto requestVote(
       RequestVoteRequestProto request) throws IOException {
     
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
     CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git 
a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java 
b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
index 19f5979..08e379a 100644
--- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
+++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
@@ -112,7 +112,7 @@ public final class NettyRpcService implements RaftServerRpc 
{
   }
 
   @Override
-  public void shutdown() {
+  public void close() {
     lifeCycle.checkStateAndClose(() -> {
       bossGroup.shutdownGracefully();
       workerGroup.shutdownGracefully();
@@ -199,7 +199,7 @@ public final class NettyRpcService implements RaftServerRpc 
{
   }
 
   @Override
-  public RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto 
request) throws IOException {
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) 
throws IOException {
     
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
     CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
 
@@ -211,7 +211,7 @@ public final class NettyRpcService implements RaftServerRpc 
{
   }
 
   @Override
-  public AppendEntriesReplyProto sendAppendEntries(AppendEntriesRequestProto 
request) throws IOException {
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto 
request) throws IOException {
     
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
     CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
 
@@ -223,7 +223,7 @@ public final class NettyRpcService implements RaftServerRpc 
{
   }
 
   @Override
-  public InstallSnapshotReplyProto 
sendInstallSnapshot(InstallSnapshotRequestProto request) throws IOException {
+  public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto 
request) throws IOException {
     
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
     CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java 
b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
index de81ec2..6526bea 100644
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
+++ b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
@@ -18,27 +18,22 @@
 package org.apache.raft.server;
 
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.shaded.proto.RaftProtos.*;
+import org.apache.raft.server.protocol.RaftServerProtocol;
 
-import java.io.IOException;
+import java.io.Closeable;
 import java.net.InetSocketAddress;
 
-public interface RaftServerRpc {
+/**
+ * An server-side interface for supporting different RPC implementations
+ * such as Netty, gRPC and Hadoop.
+ */
+public interface RaftServerRpc extends RaftServerProtocol, Closeable {
+  /** Start the RPC service. */
   void start();
 
-  void shutdown();
-
+  /** @return the address where this RPC server is listening to. */
   InetSocketAddress getInetSocketAddress();
 
-  AppendEntriesReplyProto sendAppendEntries(
-      AppendEntriesRequestProto request) throws IOException;
-
-  InstallSnapshotReplyProto sendInstallSnapshot(
-      InstallSnapshotRequestProto request) throws IOException;
-
-  RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request)
-      throws IOException;
-
   /** add information of the given peers */
   void addPeers(Iterable<RaftPeer> peers);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java
index 8552029..39bdb13 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java
@@ -187,7 +187,7 @@ class LeaderElection extends Daemon {
       final RequestVoteRequestProto r = server.createRequestVoteRequest(
           peer.getId(), electionTerm, lastEntry);
       service.submit(
-          () -> server.getServerRpc().sendRequestVote(r));
+          () -> server.getServerRpc().requestVote(r));
       submitted++;
     }
     return submitted;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
index 3b18f13..cf613ca 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
@@ -198,7 +198,7 @@ public class LogAppender extends Daemon {
 
         follower.updateLastRpcSendTime();
         final AppendEntriesReplyProto r = server.getServerRpc()
-            .sendAppendEntries(request);
+            .appendEntries(request);
         follower.updateLastRpcResponseTime();
 
         return r;
@@ -328,7 +328,7 @@ public class LogAppender extends Daemon {
       for (InstallSnapshotRequestProto request :
           new SnapshotRequestIter(snapshot, requestId)) {
         follower.updateLastRpcSendTime();
-        reply = server.getServerRpc().sendInstallSnapshot(request);
+        reply = server.getServerRpc().installSnapshot(request);
         follower.updateLastRpcResponseTime();
 
         if (!reply.getServerReply().getSuccess()) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
index 131e002..3026afa 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
@@ -132,12 +132,13 @@ public class RaftServerImpl implements RaftServer {
     this.state.setInitialConf(conf);
   }
 
+  @Override
   public void setServerRpc(RaftServerRpc serverRpc) {
     this.serverRpc = serverRpc;
     // add peers into rpc service
     RaftConfiguration conf = getRaftConf();
     if (conf != null) {
-      addPeersToRPC(conf.getPeers());
+      serverRpc.addPeers(conf.getPeers());
     }
   }
 
@@ -201,7 +202,7 @@ public class RaftServerImpl implements RaftServer {
         shutdownElectionDaemon();
         shutdownLeaderState();
 
-        serverRpc.shutdown();
+        serverRpc.close();
         state.close();
       } catch (Exception ignored) {
         LOG.warn("Failed to kill " + state.getSelfId(), ignored);
@@ -418,7 +419,7 @@ public class RaftServerImpl implements RaftServer {
       }
 
       // add new peers into the rpc service
-      addPeersToRPC(Arrays.asList(peersInNewConf));
+      getServerRpc().addPeers(Arrays.asList(peersInNewConf));
       // add staging state into the leaderState
       pending = leaderState.startSetConfiguration(request);
     }
@@ -726,10 +727,6 @@ public class RaftServerImpl implements RaftServer {
     }
   }
 
-  public void addPeersToRPC(Iterable<RaftPeer> peers) {
-    serverRpc.addPeers(peers);
-  }
-
   synchronized void replyPendingRequest(long logIndex,
       CompletableFuture<Message> message) {
     if (isLeader() && leaderState != null) { // is leader and is running

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
index c91968c..8611101 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
@@ -285,7 +285,7 @@ public class ServerState implements Closeable {
           final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration(
               entry.getIndex(), entry.getConfigurationEntry());
           configurationManager.addConfiguration(entry.getIndex(), conf);
-          server.addPeersToRPC(conf.getPeers());
+          server.getServerRpc().addPeers(conf.getPeers());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/673a2827/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
 
b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
index cc3fb35..8a7e752 100644
--- 
a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
+++ 
b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java
@@ -69,7 +69,7 @@ class SimulatedServerRpc implements RaftServerRpc {
   }
 
   @Override
-  public void shutdown() {
+  public void close() {
     try {
       interruptAndJoin();
       executor.shutdown();
@@ -86,7 +86,7 @@ class SimulatedServerRpc implements RaftServerRpc {
   }
 
   @Override
-  public AppendEntriesReplyProto sendAppendEntries(AppendEntriesRequestProto 
request)
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto 
request)
       throws IOException {
     RaftServerReply reply = serverHandler.getRpc()
         .sendRequest(new RaftServerRequest(request));
@@ -94,7 +94,7 @@ class SimulatedServerRpc implements RaftServerRpc {
   }
 
   @Override
-  public InstallSnapshotReplyProto 
sendInstallSnapshot(InstallSnapshotRequestProto request)
+  public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto 
request)
       throws IOException {
     RaftServerReply reply = serverHandler.getRpc()
         .sendRequest(new RaftServerRequest(request));
@@ -102,7 +102,7 @@ class SimulatedServerRpc implements RaftServerRpc {
   }
 
   @Override
-  public RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request)
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
       throws IOException {
     RaftServerReply reply = serverHandler.getRpc()
         .sendRequest(new RaftServerRequest(request));

Reply via email to