Repository: incubator-ratis
Updated Branches:
  refs/heads/master 2a781c220 -> 7edcd52b0


RATIS-121. In RaftServer.Builder, allow serverId and group to be initialized 
automatically.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/7edcd52b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/7edcd52b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/7edcd52b

Branch: refs/heads/master
Commit: 7edcd52b0c90a2af9b9e25e84f71adf0d2c42506
Parents: 2a781c2
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Tue Oct 24 13:12:33 2017 -0700
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Tue Oct 24 14:09:04 2017 -0700

----------------------------------------------------------------------
 .../org/apache/ratis/protocol/RaftGroup.java    |  6 ++
 .../org/apache/ratis/protocol/RaftGroupId.java  |  5 ++
 .../test/java/org/apache/ratis/BaseTest.java    |  6 +-
 .../arithmetic/expression/TestExpression.java   |  4 +-
 .../org/apache/ratis/grpc/RaftGRpcService.java  | 17 +++--
 .../grpc/client/RaftClientProtocolService.java  | 15 ++--
 .../ratis/grpc/server/AdminProtocolService.java |  7 +-
 .../grpc/server/RaftServerProtocolService.java  | 33 ++++-----
 .../hadooprpc/server/HadoopRpcService.java      | 27 +++----
 .../ratis/netty/server/NettyRpcService.java     | 45 +++++-------
 .../org/apache/ratis/server/RaftServer.java     |  6 +-
 .../ratis/server/impl/RaftServerProxy.java      | 19 +++--
 .../server/impl/RaftServerRpcWithProxy.java     | 76 ++++++++++++++++++++
 .../ratis/server/impl/ServerImplUtils.java      | 10 ++-
 .../org/apache/ratis/protocol/TestRaftId.java   |  4 +-
 .../ratis/server/simulation/RequestHandler.java | 17 +++--
 .../server/simulation/SimulatedServerRpc.java   | 10 +--
 17 files changed, 197 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index 1096518..f4a16c4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -26,6 +26,12 @@ import java.util.*;
  * peers.
  */
 public class RaftGroup {
+  private static RaftGroup EMPTY_GROUP = new 
RaftGroup(RaftGroupId.emptyGroupId(), Collections.emptyList());
+
+  public static RaftGroup emptyGroup() {
+    return EMPTY_GROUP;
+  }
+
   /** UTF-8 string as id */
   private final RaftGroupId groupId;
   /** The group of raft peers */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
index 0ddc7dc..e873ab8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java
@@ -22,6 +22,11 @@ import 
org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import java.util.UUID;
 
 public class RaftGroupId extends RaftId {
+  private static final RaftGroupId EMPTY_GROUP_ID = new RaftGroupId(new 
UUID(0L, 0L));
+
+  public static RaftGroupId emptyGroupId() {
+    return EMPTY_GROUP_ID;
+  }
 
   public static RaftGroupId randomId() {
     return new RaftGroupId(UUID.randomUUID());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java 
b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 2b308b5..1c27420 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -43,10 +43,10 @@ public abstract class BaseTest {
   }
 
   @Rule
-  public final Timeout globalTimeout = new Timeout(getGlobalTimeoutMs());
+  public final Timeout globalTimeout = new Timeout(getGlobalTimeoutSeconds() * 
1000);
 
-  public int getGlobalTimeoutMs() {
-    return 100_000;
+  public int getGlobalTimeoutSeconds() {
+    return 100;
   }
 
   private static final Supplier<File> rootTestDir = JavaUtils.memoize(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java
index a21c11f..6996fe9 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java
@@ -27,8 +27,8 @@ import java.util.concurrent.ThreadLocalRandom;
 
 public class TestExpression extends BaseTest {
   @Override
-  public int getGlobalTimeoutMs() {
-    return 1000;
+  public int getGlobalTimeoutSeconds() {
+    return 1;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index 96a7a45..853830e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -41,6 +41,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 /** A grpc implementation of {@link RaftServerRpc}. */
 public class RaftGRpcService implements RaftServerRpc {
@@ -70,7 +71,7 @@ public class RaftGRpcService implements RaftServerRpc {
   private final InetSocketAddress address;
   private final Map<RaftPeerId, RaftServerProtocolClient> peers =
       Collections.synchronizedMap(new HashMap<>());
-  private final RaftPeerId selfId;
+  private final Supplier<RaftPeerId> idSupplier;
 
   private RaftGRpcService(RaftServer server) {
     this(server,
@@ -79,11 +80,11 @@ public class RaftGRpcService implements RaftServerRpc {
   }
   private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) 
{
     ServerBuilder serverBuilder = ServerBuilder.forPort(port);
-    selfId = raftServer.getId();
+    idSupplier = raftServer::getId;
     server = ((NettyServerBuilder) 
serverBuilder).maxMessageSize(maxMessageSize)
-        .addService(new RaftServerProtocolService(selfId, raftServer))
-        .addService(new RaftClientProtocolService(selfId, raftServer))
-        .addService(new AdminProtocolService(selfId, raftServer))
+        .addService(new RaftServerProtocolService(idSupplier, raftServer))
+        .addService(new RaftClientProtocolService(idSupplier, raftServer))
+        .addService(new AdminProtocolService(raftServer))
         .build();
 
     // start service to determine the port (in case port is configured as 0)
@@ -92,6 +93,10 @@ public class RaftGRpcService implements RaftServerRpc {
     LOG.info("Server started, listening on " + address.getPort());
   }
 
+  RaftPeerId getId() {
+    return idSupplier.get();
+  }
+
   @Override
   public SupportedRpcType getRpcType() {
     return SupportedRpcType.GRPC;
@@ -145,7 +150,7 @@ public class RaftGRpcService implements RaftServerRpc {
   @Override
   public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
       throws IOException {
-    CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, selfId,
+    CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(),
         null, request);
 
     RaftServerProtocolClient target = Objects.requireNonNull(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
index 1cd913a..5d68d42 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 public class RaftClientProtocolService extends 
RaftClientProtocolServiceImplBase {
   static final Logger LOG = 
LoggerFactory.getLogger(RaftClientProtocolService.class);
@@ -66,14 +67,18 @@ public class RaftClientProtocolService extends 
RaftClientProtocolServiceImplBase
   }
   private static final PendingAppend COMPLETED = new 
PendingAppend(Long.MAX_VALUE);
 
-  private final RaftPeerId id;
+  private final Supplier<RaftPeerId> idSupplier;
   private final RaftClientAsynchronousProtocol protocol;
 
-  public RaftClientProtocolService(RaftPeerId id, 
RaftClientAsynchronousProtocol protocol) {
-    this.id = id;
+  public RaftClientProtocolService(Supplier<RaftPeerId> idSupplier, 
RaftClientAsynchronousProtocol protocol) {
+    this.idSupplier = idSupplier;
     this.protocol = protocol;
   }
 
+  RaftPeerId getId() {
+    return idSupplier.get();
+  }
+
   @Override
   public void setConfiguration(SetConfigurationRequestProto proto,
       StreamObserver<RaftClientReplyProto> responseObserver) {
@@ -145,7 +150,7 @@ public class RaftClientProtocolService extends 
RaftClientProtocolServiceImplBase
         });
       } catch (Throwable e) {
         LOG.info("{} got exception when handling client append request {}: {}",
-            id, request.getRpcRequest(), e);
+            getId(), request.getRpcRequest(), e);
         responseObserver.onError(RaftGrpcUtil.wrapException(e));
       }
     }
@@ -165,7 +170,7 @@ public class RaftClientProtocolService extends 
RaftClientProtocolServiceImplBase
     @Override
     public void onError(Throwable t) {
       // for now we just log a msg
-      LOG.warn("{} onError: client Append cancelled", id, t);
+      LOG.warn("{} onError: client Append cancelled", getId(), t);
       synchronized (pendingList) {
         pendingList.clear();
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
index d2aae53..e4d169a 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
@@ -20,22 +20,19 @@ package org.apache.ratis.grpc.server;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.AdminAsynchronousProtocol;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.ReinitializeRequest;
 import org.apache.ratis.protocol.ServerInformatonRequest;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
 import 
org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase;
 
 public class AdminProtocolService extends AdminProtocolServiceImplBase {
-  private final RaftPeerId id;
   private final AdminAsynchronousProtocol protocol;
 
-  public AdminProtocolService(RaftPeerId id, AdminAsynchronousProtocol 
protocol) {
-    this.id = id;
+  public AdminProtocolService(AdminAsynchronousProtocol protocol) {
     this.protocol = protocol;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
index 3e5ae0d..8c2f31f 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
@@ -21,27 +21,28 @@ import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
 import 
org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.function.Supplier;
+
 public class RaftServerProtocolService extends 
RaftServerProtocolServiceImplBase {
   public static final Logger LOG = 
LoggerFactory.getLogger(RaftServerProtocolService.class);
 
-  private final RaftPeerId id;
+  private final Supplier<RaftPeerId> idSupplier;
   private final RaftServerProtocol server;
 
-  public RaftServerProtocolService(RaftPeerId id, RaftServerProtocol server) {
-    this.id = id;
+  public RaftServerProtocolService(Supplier<RaftPeerId> idSupplier, 
RaftServerProtocol server) {
+    this.idSupplier = idSupplier;
     this.server = server;
   }
 
+  RaftPeerId getId() {
+    return idSupplier.get();
+  }
+
   @Override
   public void requestVote(RequestVoteRequestProto request,
       StreamObserver<RequestVoteReplyProto> responseObserver) {
@@ -51,7 +52,7 @@ public class RaftServerProtocolService extends 
RaftServerProtocolServiceImplBase
       responseObserver.onCompleted();
     } catch (Throwable e) {
       LOG.info("{} got exception when handling requestVote {}: {}",
-          id, request.getServerRequest(), e);
+          getId(), request.getServerRequest(), e);
       responseObserver.onError(RaftGrpcUtil.wrapException(e));
     }
   }
@@ -67,7 +68,7 @@ public class RaftServerProtocolService extends 
RaftServerProtocolServiceImplBase
           responseObserver.onNext(reply);
         } catch (Throwable e) {
           LOG.info("{} got exception when handling appendEntries {}: {}",
-              id, request.getServerRequest(), e);
+              getId(), request.getServerRequest(), e);
           responseObserver.onError(RaftGrpcUtil.wrapException(e));
         }
       }
@@ -75,12 +76,12 @@ public class RaftServerProtocolService extends 
RaftServerProtocolServiceImplBase
       @Override
       public void onError(Throwable t) {
         // for now we just log a msg
-        LOG.info("{}: appendEntries on error. Exception: {}", id, t);
+        LOG.info("{}: appendEntries on error. Exception: {}", getId(), t);
       }
 
       @Override
       public void onCompleted() {
-        LOG.info("{}: appendEntries completed", id);
+        LOG.info("{}: appendEntries completed", getId());
         responseObserver.onCompleted();
       }
     };
@@ -97,19 +98,19 @@ public class RaftServerProtocolService extends 
RaftServerProtocolServiceImplBase
           responseObserver.onNext(reply);
         } catch (Throwable e) {
           LOG.info("{} got exception when handling installSnapshot {}: {}",
-              id, request.getServerRequest(), e);
+              getId(), request.getServerRequest(), e);
           responseObserver.onError(RaftGrpcUtil.wrapException(e));
         }
       }
 
       @Override
       public void onError(Throwable t) {
-        LOG.info("{}: installSnapshot on error. Exception: {}", id, t);
+        LOG.info("{}: installSnapshot on error. Exception: {}", getId(), t);
       }
 
       @Override
       public void onCompleted() {
-        LOG.info("{}: installSnapshot completed", id);
+        LOG.info("{}: installSnapshot completed", getId());
         responseObserver.onCompleted();
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index 1fea9ff..5e571d4 100644
--- 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -24,11 +24,11 @@ import org.apache.ratis.hadooprpc.HadoopConfigKeys;
 import org.apache.ratis.hadooprpc.Proxy;
 import org.apache.ratis.hadooprpc.client.CombinedClientProtocolPB;
 import 
org.apache.ratis.hadooprpc.client.CombinedClientProtocolServerSideTranslatorPB;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.shaded.com.google.protobuf.BlockingService;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@@ -47,7 +47,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 /** Server side Hadoop RPC service. */
-public class HadoopRpcService implements RaftServerRpc {
+public class HadoopRpcService extends 
RaftServerRpcWithProxy<Proxy<RaftServerProtocolPB>, 
PeerProxyMap<Proxy<RaftServerProtocolPB>>> {
   public static final Logger LOG = 
LoggerFactory.getLogger(HadoopRpcService.class);
   static final String CLASS_NAME = HadoopRpcService.class.getSimpleName();
   public static final String SEND_SERVER_REQUEST = CLASS_NAME + 
".sendServerRequest";
@@ -84,16 +84,13 @@ public class HadoopRpcService implements RaftServerRpc {
     return new Builder();
   }
 
-  private final RaftPeerId id;
   private final RPC.Server ipcServer;
   private final InetSocketAddress ipcServerAddress;
 
-  private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies;
-
   private HadoopRpcService(RaftServer server, final Configuration conf) {
-    this.id = server.getId();
-    this.proxies = new PeerProxyMap<>(id.toString(),
-        p -> new Proxy<>(RaftServerProtocolPB.class, p.getAddress(), conf));
+    super(server::getId,
+        id -> new PeerProxyMap<>(id.toString(),
+            p -> new Proxy<>(RaftServerProtocolPB.class, p.getAddress(), 
conf)));
     try {
       this.ipcServer = newRpcServer(server, conf);
     } catch (IOException e) {
@@ -147,13 +144,14 @@ public class HadoopRpcService implements RaftServerRpc {
   }
 
   @Override
-  public void start() {
+  public void startImpl() {
     ipcServer.start();
   }
 
   @Override
-  public void close() {
+  public void closeImpl() {
     ipcServer.stop();
+    super.closeImpl();
   }
 
   @Override
@@ -181,9 +179,9 @@ public class HadoopRpcService implements RaftServerRpc {
       REQUEST request, ByteString replyId,
       CheckedFunction<RaftServerProtocolPB, REPLY, ServiceException> f)
       throws IOException {
-    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, 
request);
 
-    final RaftServerProtocolPB proxy = proxies.getProxy(
+    final RaftServerProtocolPB proxy = getProxies().getProxy(
         RaftPeerId.valueOf(replyId)).getProtocol();
     try {
       return f.apply(proxy);
@@ -191,9 +189,4 @@ public class HadoopRpcService implements RaftServerRpc {
       throw ProtoUtils.toIOException(se);
     }
   }
-
-  @Override
-  public void addPeers(Iterable<RaftPeer> peers) {
-    proxies.addPeers(peers);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 8089821..cb337b5 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.ServerInformationReply;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
 import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.shaded.io.netty.channel.*;
 import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup;
@@ -43,6 +44,7 @@ import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyExceptionReplyPr
 import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto;
 import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
 import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.ProtoUtils;
 
@@ -50,11 +52,12 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 /**
  * A netty server endpoint that acts as the communication layer.
  */
-public final class NettyRpcService implements RaftServerRpc {
+public final class NettyRpcService extends 
RaftServerRpcWithProxy<NettyRpcProxy, NettyRpcProxy.PeerMap> {
   static final String CLASS_NAME = NettyRpcService.class.getSimpleName();
   public static final String SEND_SERVER_REQUEST = CLASS_NAME + 
".sendServerRequest";
 
@@ -76,16 +79,12 @@ public final class NettyRpcService implements RaftServerRpc 
{
     return new Builder();
   }
 
-  private final LifeCycle lifeCycle = new 
LifeCycle(getClass().getSimpleName());
   private final RaftServer server;
-  private final RaftPeerId id;
 
   private final EventLoopGroup bossGroup = new NioEventLoopGroup();
   private final EventLoopGroup workerGroup = new NioEventLoopGroup();
   private final ChannelFuture channelFuture;
 
-  private final NettyRpcProxy.PeerMap proxies;
-
   @ChannelHandler.Sharable
   class InboundHandler extends 
SimpleChannelInboundHandler<RaftNettyServerRequestProto> {
     @Override
@@ -97,9 +96,8 @@ public final class NettyRpcService implements RaftServerRpc {
 
   /** Constructs a netty server with the given port. */
   private NettyRpcService(RaftServer server) {
+    super(server::getId, id -> new NettyRpcProxy.PeerMap(id.toString()));
     this.server = server;
-    this.id = server.getId();
-    this.proxies = new NettyRpcProxy.PeerMap(id.toString());
 
     final ChannelInitializer<SocketChannel> initializer
         = new ChannelInitializer<SocketChannel>() {
@@ -135,19 +133,17 @@ public final class NettyRpcService implements 
RaftServerRpc {
   }
 
   @Override
-  public void start() {
-    lifeCycle.startAndTransition(() -> channelFuture.syncUninterruptibly());
+  public void startImpl() {
+    channelFuture.syncUninterruptibly();
   }
 
   @Override
-  public void close() {
-    lifeCycle.checkStateAndClose(() -> {
-      bossGroup.shutdownGracefully();
-      workerGroup.shutdownGracefully();
-      final ChannelFuture f = getChannel().close();
-      proxies.close();
-      f.syncUninterruptibly();
-    });
+  public void closeImpl() {
+    bossGroup.shutdownGracefully();
+    workerGroup.shutdownGracefully();
+    final ChannelFuture f = getChannel().close();
+    super.closeImpl();
+    f.syncUninterruptibly();
   }
 
   @Override
@@ -247,7 +243,7 @@ public final class NettyRpcService implements RaftServerRpc 
{
 
   @Override
   public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) 
throws IOException {
-    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, 
request);
 
     final RaftNettyServerRequestProto proto = 
RaftNettyServerRequestProto.newBuilder()
         .setRequestVoteRequest(request)
@@ -258,7 +254,7 @@ public final class NettyRpcService implements RaftServerRpc 
{
 
   @Override
   public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto 
request) throws IOException {
-    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, 
request);
 
     final RaftNettyServerRequestProto proto = 
RaftNettyServerRequestProto.newBuilder()
         .setAppendEntriesRequest(request)
@@ -269,7 +265,7 @@ public final class NettyRpcService implements RaftServerRpc 
{
 
   @Override
   public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto 
request) throws IOException {
-    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, 
request);
 
     final RaftNettyServerRequestProto proto = 
RaftNettyServerRequestProto.newBuilder()
         .setInstallSnapshotRequest(request)
@@ -282,17 +278,12 @@ public final class NettyRpcService implements 
RaftServerRpc {
       RaftRpcRequestProto request, RaftNettyServerRequestProto proto)
       throws IOException {
     final RaftPeerId id = RaftPeerId.valueOf(request.getReplyId());
-    final NettyRpcProxy p = proxies.getProxy(id);
+    final NettyRpcProxy p = getProxies().getProxy(id);
     try {
       return p.send(request, proto);
     } catch (ClosedChannelException cce) {
-      proxies.resetProxy(id);
+      getProxies().resetProxy(id);
       throw cce;
     }
   }
-
-  @Override
-  public void addPeers(Iterable<RaftPeer> peers) {
-    proxies.addPeers(peers);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 622e75a..a3d9d91 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -61,15 +61,15 @@ public interface RaftServer extends Closeable, RpcType.Get, 
RaftServerProtocol,
   class Builder {
     private RaftPeerId serverId;
     private StateMachine stateMachine;
-    private RaftGroup group;
+    private RaftGroup group = RaftGroup.emptyGroup();
     private RaftProperties properties;
     private Parameters parameters;
 
     /** @return a {@link RaftServer} object. */
     public RaftServer build() throws IOException {
       return ServerImplUtils.newRaftServer(
-          Objects.requireNonNull(serverId, "The 'serverId' field is not 
initialized."),
-          Objects.requireNonNull(group, "The 'peers' field is not 
initialized."),
+          serverId,
+          group,
           Objects.requireNonNull(stateMachine, "The 'stateMachine' is not 
initialized."),
           Objects.requireNonNull(properties, "The 'properties' field is not 
initialized."),
           parameters);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 0a16954..7f7a2bf 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -32,8 +32,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.net.InetSocketAddress;
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -54,14 +55,14 @@ public class RaftServerProxy implements RaftServer {
   RaftServerProxy(RaftPeerId id, StateMachine stateMachine,
       RaftGroup group, RaftProperties properties, Parameters parameters)
       throws IOException {
-    this.id = id;
     this.properties = properties;
     this.stateMachine = stateMachine;
 
     final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
     this.factory = ServerFactory.cast(rpcType.newFactory(parameters));
 
-    this.serverRpc = initRaftServerRpc(factory, this, group);
+    this.serverRpc = factory.newRaftServerRpc(this);
+    this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
     this.impl = CompletableFuture.completedFuture(initImpl(group));
   }
 
@@ -69,14 +70,10 @@ public class RaftServerProxy implements RaftServer {
     return new RaftServerImpl(id, group, this, properties);
   }
 
-  private static RaftServerRpc initRaftServerRpc(
-      ServerFactory factory, RaftServer server, RaftGroup group) {
-    final RaftServerRpc rpc = factory.newRaftServerRpc(server);
-    // add peers into rpc service
-    if (group != null) {
-      rpc.addPeers(group.getPeers());
-    }
-    return rpc;
+  private static String getIdStringFrom(RaftServerRpc rpc) {
+    final InetSocketAddress address = rpc.getInetSocketAddress();
+    return address != null? address.getHostName() + "_" + address.getPort()
+        : rpc.getRpcType() + "-" + UUID.randomUUID();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
new file mode 100644
index 0000000..fe41859
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.PeerProxyMap;
+
+import java.io.Closeable;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Implementing {@link RaftServerRpc} using a {@link PeerProxyMap}. */
+public abstract class RaftServerRpcWithProxy<PROXY extends Closeable, PROXIES 
extends PeerProxyMap<PROXY>>
+    implements RaftServerRpc {
+  private final Supplier<RaftPeerId> idSupplier;
+  private final Supplier<LifeCycle> lifeCycleSupplier;
+  private final Supplier<PROXIES> proxiesSupplier;
+
+  public RaftServerRpcWithProxy(Supplier<RaftPeerId> idSupplier, 
Function<RaftPeerId, PROXIES> proxyCreater) {
+    this.idSupplier = idSupplier;
+    this.lifeCycleSupplier = JavaUtils.memoize(() -> new LifeCycle(getId()));
+    this.proxiesSupplier = JavaUtils.memoize(() -> 
proxyCreater.apply(getId()));
+  }
+
+  public RaftPeerId getId() {
+    return idSupplier.get();
+  }
+
+  public LifeCycle getLifeCycle() {
+    return lifeCycleSupplier.get();
+  }
+
+  public PROXIES getProxies() {
+    return proxiesSupplier.get();
+  }
+
+  @Override
+  public void addPeers(Iterable<RaftPeer> peers) {
+    getProxies().addPeers(peers);
+  }
+
+  @Override
+  public final void start() {
+    getLifeCycle().startAndTransition(() -> startImpl());
+  }
+
+  public abstract void startImpl();
+
+  @Override
+  public final void close() {
+    getLifeCycle().checkStateAndClose(() -> closeImpl());
+  }
+
+  public void closeImpl() {
+    getProxies().close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 544ed13..15ee155 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -33,17 +33,23 @@ public class ServerImplUtils {
   public static RaftServerProxy newRaftServer(
       RaftPeerId id, RaftGroup group, StateMachine stateMachine,
       RaftProperties properties, Parameters parameters) throws IOException {
+    final RaftServerProxy proxy;
     try {
       // attempt multiple times to avoid temporary bind exception
-      return JavaUtils.attempt(
+      proxy = JavaUtils.attempt(
           () -> new RaftServerProxy(id, stateMachine, group, properties, 
parameters),
-          5, 500L, "newRaftServer", RaftServerImpl.LOG);
+          5, 500L, "new RaftServerProxy", RaftServerProxy.LOG);
     } catch (InterruptedException e) {
       throw IOUtils.toInterruptedIOException(
           "Interrupted when creating RaftServer " + id + ", " + group, e);
     } catch (IOException e) {
       throw new IOException("Failed to create RaftServer " + id + ", " + 
group, e);
     }
+    // add peers into rpc service
+    if (!group.getPeers().isEmpty()) {
+      proxy.getServerRpc().addPeers(group.getPeers());
+    }
+    return proxy;
   }
 
   public static TermIndex newTermIndex(long term, long index) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java 
b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java
index e83e32a..b454c31 100644
--- a/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java
+++ b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java
@@ -24,8 +24,8 @@ import org.junit.Test;
 
 public class TestRaftId extends BaseTest {
   @Override
-  public int getGlobalTimeoutMs() {
-    return 1000;
+  public int getGlobalTimeoutSeconds() {
+    return 1;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
index bd60a3b..5c12ef4 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Supplier;
 
 public class RequestHandler<REQUEST extends RaftRpcMessage,
     REPLY extends RaftRpcMessage> {
@@ -40,17 +41,17 @@ public class RequestHandler<REQUEST extends RaftRpcMessage,
     REPLY handleRequest(REQUEST r) throws IOException;
   }
 
-  private final String serverId;
+  private final Supplier<String> serverIdSupplier;
   private final String name;
   private final SimulatedRequestReply<REQUEST, REPLY> rpc;
   private final HandlerInterface<REQUEST, REPLY> handlerImpl;
   private final List<HandlerDaemon> daemons;
 
-  RequestHandler(String serverId, String name,
+  RequestHandler(Supplier<String> serverIdSupplier, String name,
                  SimulatedRequestReply<REQUEST, REPLY> rpc,
                  HandlerInterface<REQUEST, REPLY> handlerImpl,
                  int numHandlers) {
-    this.serverId = serverId;
+    this.serverIdSupplier = serverIdSupplier;
     this.name = name;
     this.rpc = rpc;
     this.handlerImpl = handlerImpl;
@@ -61,12 +62,16 @@ public class RequestHandler<REQUEST extends RaftRpcMessage,
     }
   }
 
+  private String getServerId() {
+    return serverIdSupplier.get();
+  }
+
   void startDaemon() {
     daemons.forEach(Thread::start);
   }
 
   void shutdown() {
-    rpc.shutdown(serverId);
+    rpc.shutdown(getServerId());
   }
 
   void interruptAndJoinDaemon() throws InterruptedException {
@@ -106,14 +111,14 @@ public class RequestHandler<REQUEST extends 
RaftRpcMessage,
 
     @Override
     public String toString() {
-      return serverId + "." + name + id;
+      return getServerId() + "." + name + id;
     }
 
     @Override
     public void run() {
       while (handlerImpl.isAlive()) {
         try {
-          handleRequest(rpc.takeRequest(serverId));
+          handleRequest(rpc.takeRequest(getServerId()));
         } catch (InterruptedIOException e) {
           LOG.info(this + " is interrupted by " + e);
           LOG.trace("TRACE", e);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 91b7ad5..67e1c25 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -21,7 +21,6 @@ import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.Daemon;
@@ -35,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 class SimulatedServerRpc implements RaftServerRpc {
   static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class);
@@ -48,10 +48,10 @@ class SimulatedServerRpc implements RaftServerRpc {
       SimulatedRequestReply<RaftServerRequest, RaftServerReply> 
serverRequestReply,
       SimulatedRequestReply<RaftClientRequest, RaftClientReply> 
clientRequestReply) {
     this.server = (RaftServerProxy)server;
-    this.serverHandler = new RequestHandler<>(server.getId().toString(),
-        "serverHandler", serverRequestReply, serverHandlerImpl, 3);
-    this.clientHandler = new RequestHandler<>(server.getId().toString(),
-        "clientHandler", clientRequestReply, clientHandlerImpl, 3);
+
+    final Supplier<String> id = () -> server.getId().toString();
+    this.serverHandler = new RequestHandler<>(id, "serverHandler", 
serverRequestReply, serverHandlerImpl, 3);
+    this.clientHandler = new RequestHandler<>(id, "clientHandler", 
clientRequestReply, clientHandlerImpl, 3);
   }
 
   @Override


Reply via email to