This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e61333d  RATIS-1183. Add getId() and getPeer() to 
RaftServer/RaftServer.Division. (#302)
e61333d is described below

commit e61333d16fe50de0cef4c9fe603f7776cf3e7e9c
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Nov 27 07:47:11 2020 +0800

    RATIS-1183. Add getId() and getPeer() to RaftServer/RaftServer.Division. 
(#302)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 16 +++---
 .../java/org/apache/ratis/server/RaftServer.java   | 23 ++++++++-
 .../apache/ratis/server/impl/LeaderElection.java   |  2 +-
 .../org/apache/ratis/server/impl/LeaderState.java  |  3 +-
 .../org/apache/ratis/server/impl/LogAppender.java  |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 16 ++----
 .../apache/ratis/server/impl/RaftServerProxy.java  | 15 ++++++
 .../org/apache/ratis/server/impl/RoleInfo.java     |  5 +-
 .../org/apache/ratis/server/impl/ServerState.java  |  2 +-
 .../java/org/apache/ratis/MiniRaftCluster.java     | 23 ++-------
 .../test/java/org/apache/ratis/RaftBasicTests.java |  2 +-
 .../ratis/server/impl/RaftServerTestUtil.java      | 60 +++++++++++++++++-----
 .../statemachine/SimpleStateMachine4Testing.java   |  2 +-
 .../ratis/datastream/DataStreamBaseTest.java       | 28 +++++++---
 .../datastream/TestNettyDataStreamWithMock.java    |  3 +-
 .../apache/ratis/grpc/TestGrpcMessageMetrics.java  |  7 +--
 .../apache/ratis/grpc/TestRetryCacheWithGrpc.java  | 11 ++--
 .../apache/ratis/server/ServerRestartTests.java    | 26 ++++++----
 .../ratis/statemachine/TestStateMachine.java       |  8 +--
 19 files changed, 161 insertions(+), 93 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 23e55c2..c1d4515 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.grpc.server;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
@@ -69,16 +70,15 @@ public class GrpcLogAppender extends LogAppender {
                          FollowerInfo f) {
     super(server, leaderState, f);
 
-    this.rpcService = (GrpcService) server.getServerRpc();
+    this.rpcService = (GrpcService) server.getRaftServer().getServerRpc();
+
+    final RaftProperties properties = server.getRaftServer().getProperties();
+    this.maxPendingRequestsNum = 
GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
+    this.requestTimeoutDuration = 
RaftServerConfigKeys.Rpc.requestTimeout(properties);
+    this.installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
 
-    maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(
-        server.getProxy().getProperties());
-    requestTimeoutDuration = 
RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
-    installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(
-        server.getProxy().getProperties());
     grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
-    grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(),
-        () -> pendingRequests.logRequestsSize());
+    grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), 
pendingRequests::logRequestsSize);
   }
 
   private GrpcServerProtocolClient getClient() throws IOException {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index fc8b8ab..6af1027 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -20,8 +20,8 @@ package org.apache.ratis.server;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.rpc.RpcFactory;
 import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.server.impl.ServerImplUtils;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Objects;
+import java.util.Optional;
 
 /** Raft server interface */
 public interface RaftServer extends Closeable, RpcType.Get,
@@ -48,9 +49,24 @@ public interface RaftServer extends Closeable, RpcType.Get,
     /** @return the {@link RaftGroupMemberId} for this division. */
     RaftGroupMemberId getMemberId();
 
+    /** @return the {@link RaftPeerId} for this division. */
+    default RaftPeerId getId() {
+      return getMemberId().getPeerId();
+    }
+
+    /** @return the {@link RaftPeer} for this division. */
+    default RaftPeer getPeer() {
+      return Optional.ofNullable(getGroup())
+          .map(g -> g.getPeer(getId()))
+          .orElseGet(() -> getRaftServer().getPeer());
+    }
+
     /** @return the {@link RaftGroup} for this division. */
     RaftGroup getGroup();
 
+    /** @return the {@link RaftServer} containing this division. */
+    RaftServer getRaftServer();
+
     /** @return the {@link StateMachine} for this division. */
     StateMachine getStateMachine();
 
@@ -60,6 +76,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
   /** @return the server ID. */
   RaftPeerId getId();
 
+  /** @return the {@link RaftPeer} for this server. */
+  RaftPeer getPeer();
+
   /** @return the group IDs the server is part of. */
   Iterable<RaftGroupId> getGroupIds();
 
@@ -72,7 +91,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
   RaftProperties getProperties();
 
   /** @return the factory for creating server components. */
-  ServerFactory getFactory();
+  RpcFactory getFactory();
 
   /** Start this server. */
   void start() throws IOException;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 7f71019..dbbefa8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -241,7 +241,7 @@ class LeaderElection implements Runnable {
             return;
           case SHUTDOWN:
             LOG.info("{} received shutdown response when requesting votes.", 
this);
-            server.getProxy().close();
+            server.getRaftServer().close();
             return;
           case REJECTED:
           case DISCOVERED_A_NEW_TERM:
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index eda0a43..e161d6f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -236,10 +236,11 @@ public class LeaderState {
   private final RaftServerMetrics raftServerMetrics;
   private final LogAppenderMetrics logAppenderMetrics;
 
-  LeaderState(RaftServerImpl server, RaftProperties properties) {
+  LeaderState(RaftServerImpl server) {
     this.name = server.getMemberId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
     this.server = server;
 
+    final RaftProperties properties = server.getRaftServer().getProperties();
     stagingCatchupGap = RaftServerConfigKeys.stagingCatchupGap(properties);
     syncInterval = RaftServerConfigKeys.Rpc.sleepTime(properties);
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index a2f1624..6c1f80e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -155,7 +155,7 @@ public class LogAppender {
     this.leaderState = leaderState;
     this.raftLog = server.getState().getLog();
 
-    final RaftProperties properties = server.getProxy().getProperties();
+    final RaftProperties properties = server.getRaftServer().getProperties();
     this.snapshotChunkMaxSize = 
RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
     this.halfMinTimeoutMs = server.getMinTimeoutMs() / 2;
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 02fc88e..691ed6e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -95,8 +95,6 @@ public class RaftServerImpl implements RaftServer.Division,
 
   private final LifeCycle lifeCycle;
   private final ServerState state;
-  private final Supplier<RaftPeer> peerSupplier = JavaUtils.memoize(() ->
-      
RaftPeer.newBuilder().setId(getId()).setAddress(getServerRpc().getInetSocketAddress()).build());
   private final RoleInfo role;
 
   private final DataStreamMap dataStreamMap;
@@ -152,13 +150,8 @@ public class RaftServerImpl implements RaftServer.Division,
     return new RetryCache(expireTime);
   }
 
-  LogAppender newLogAppender(
-      LeaderState leaderState, FollowerInfo f) {
-    return getProxy().getFactory().newLogAppender(this, leaderState, f);
-  }
-
-  RaftPeer getPeer() {
-    return peerSupplier.get();
+  LogAppender newLogAppender(LeaderState leaderState, FollowerInfo f) {
+    return getRaftServer().getFactory().newLogAppender(this, leaderState, f);
   }
 
   int getMinTimeoutMs() {
@@ -200,7 +193,8 @@ public class RaftServerImpl implements RaftServer.Division,
     return retryCache;
   }
 
-  public RaftServerProxy getProxy() {
+  @Override
+  public RaftServerProxy getRaftServer() {
     return proxy;
   }
 
@@ -434,7 +428,7 @@ public class RaftServerImpl implements RaftServer.Division,
     state.becomeLeader();
 
     // start sending AppendEntries RPC to followers
-    final LogEntryProto e = role.startLeaderState(this, 
getProxy().getProperties());
+    final LogEntryProto e = role.startLeaderState(this);
     getState().setRaftConf(e);
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 0247211..f1dbe2f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -58,6 +58,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.*;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -155,6 +156,7 @@ public class RaftServerProxy implements RaftServer {
   }
 
   private final RaftPeerId id;
+  private final Supplier<RaftPeer> peerSupplier = 
JavaUtils.memoize(this::buildRaftPeer);
   private final RaftProperties properties;
   private final StateMachine.Registry stateMachineRegistry;
   private final LifeCycle lifeCycle;
@@ -238,6 +240,19 @@ public class RaftServerProxy implements RaftServer {
   }
 
   @Override
+  public RaftPeer getPeer() {
+    return peerSupplier.get();
+  }
+
+  private RaftPeer buildRaftPeer() {
+    return RaftPeer.newBuilder()
+        .setId(getId())
+        .setAddress(getServerRpc().getInetSocketAddress())
+        .setDataStreamAddress(getDataStreamServerRpc().getInetSocketAddress())
+        .build();
+  }
+
+  @Override
   public List<RaftGroupId> getGroupIds() {
     return impls.getGroupIds();
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 1fbd6f4..1f057dd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -18,7 +18,6 @@
 
 package org.apache.ratis.server.impl;
 
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -90,8 +89,8 @@ class RoleInfo {
     return Objects.requireNonNull(leaderState.get(), "leaderState is null");
   }
 
-  LogEntryProto startLeaderState(RaftServerImpl server, RaftProperties 
properties) {
-    return updateAndGet(leaderState, new LeaderState(server, 
properties)).start();
+  LogEntryProto startLeaderState(RaftServerImpl server) {
+    return updateAndGet(leaderState, new LeaderState(server)).start();
   }
 
   void shutdownLeaderState(boolean allowNull) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 708bd2b..e2a495f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -158,7 +158,7 @@ public class ServerState implements Closeable {
 
   private long initStatemachine(StateMachine sm, RaftGroupId gid)
       throws IOException {
-    sm.initialize(server.getProxy(), gid, storage);
+    sm.initialize(server.getRaftServer(), gid, storage);
     SnapshotInfo snapshot = sm.getLatestSnapshot();
 
     if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 395ad10..c136add 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -296,7 +296,7 @@ public abstract class MiniRaftCluster implements Closeable {
   public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean 
format) {
     final RaftServerProxy s = newRaftServer(id, group, format);
     Preconditions.assertTrue(servers.put(id, s) == null);
-    peers.put(id, toRaftPeer(s));
+    peers.put(id, s.getPeer());
     return s;
   }
 
@@ -404,25 +404,12 @@ public abstract class MiniRaftCluster implements 
Closeable {
     };
   }
 
-  public static List<RaftPeer> toRaftPeers(
-      Collection<RaftServerProxy> servers) {
+  private static List<RaftPeer> toRaftPeers(Collection<RaftServerProxy> 
servers) {
     return servers.stream()
-        .map(MiniRaftCluster::toRaftPeer)
+        .map(RaftServer::getPeer)
         .collect(Collectors.toList());
   }
 
-  public static RaftPeer toRaftPeer(RaftServerImpl s) {
-    return toRaftPeer(s.getProxy());
-  }
-
-  public static RaftPeer toRaftPeer(RaftServerProxy s) {
-    return RaftPeer.newBuilder()
-            .setId(s.getId())
-            .setAddress(s.getServerRpc().getInetSocketAddress())
-            
.setDataStreamAddress(s.getDataStreamServerRpc().getInetSocketAddress())
-            .build();
-  }
-
   public PeerChanges addNewPeers(int number, boolean startNewPeer)
       throws IOException {
     return addNewPeers(generateIds(number, servers.size()), startNewPeer);
@@ -463,7 +450,7 @@ public abstract class MiniRaftCluster implements Closeable {
     Collection<RaftPeer> peers = new ArrayList<>(group.getPeers());
     List<RaftPeer> removedPeers = new ArrayList<>(number);
     if (removeLeader) {
-      final RaftPeer leader = toRaftPeer(RaftTestUtil.waitForLeader(this));
+      final RaftPeer leader = RaftTestUtil.waitForLeader(this).getPeer();
       Preconditions.assertTrue(!excluded.contains(leader));
       peers.remove(leader);
       removedPeers.add(leader);
@@ -471,7 +458,7 @@ public abstract class MiniRaftCluster implements Closeable {
     List<RaftServerImpl> followers = getFollowers();
     for (int i = 0, removed = 0; i < followers.size() &&
         removed < (removeLeader ? number - 1 : number); i++) {
-      RaftPeer toRemove = toRaftPeer(followers.get(i));
+      RaftPeer toRemove = followers.get(i).getPeer();
       if (!excluded.contains(toRemove)) {
         peers.remove(toRemove);
         removedPeers.add(toRemove);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 1fcc583..3486f90 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -71,7 +71,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
   {
     Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
-    Log4jUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), 
Level.DEBUG);
+    RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG);
 
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), 
TimeDuration.valueOf(5, TimeUnit.SECONDS));
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 440b3bb..56b7e60 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -26,6 +26,10 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Log4jUtils;
 import org.apache.ratis.util.TimeDuration;
@@ -42,6 +46,9 @@ import java.util.stream.Stream;
 public class RaftServerTestUtil {
   static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class);
 
+  public static void setStateMachineUpdaterLogLevel(Level level) {
+    Log4jUtils.setLogLevel(StateMachineUpdater.LOG, level);
+  }
   public static void setWatchRequestsLogLevel(Level level) {
     Log4jUtils.setLogLevel(WatchRequests.LOG, level);
   }
@@ -65,7 +72,8 @@ public class RaftServerTestUtil {
     int deadIncluded = 0;
     final RaftConfiguration current = RaftConfiguration.newBuilder()
         .setConf(peers).setLogEntryIndex(0).build();
-    for (RaftServerImpl server : cluster.iterateServerImpls()) {
+    for (RaftServer.Division d : cluster.iterateServerImpls()) {
+      final RaftServerImpl server = (RaftServerImpl)d;
       LOG.info("checking {}", server);
       if (deadPeers != null && deadPeers.contains(server.getId())) {
         if (current.containsInConf(server.getId())) {
@@ -87,12 +95,24 @@ public class RaftServerTestUtil {
     Assert.assertEquals(peers.size(), numIncluded + deadIncluded);
   }
 
-  public static long getRetryCacheSize(RaftServerImpl server) {
-    return server.getRetryCache().size();
+  public static boolean isLeaderReady(RaftServer.Division server) {
+    return ((RaftServerImpl)server).isLeaderReady();
+  }
+
+  public static long getCurrentTerm(RaftServer.Division server) {
+    return ((RaftServerImpl)server).getState().getCurrentTerm();
+  }
+
+  public static long getLastAppliedIndex(RaftServer.Division server) {
+    return ((RaftServerImpl)server).getState().getLastAppliedIndex();
   }
 
-  public static RetryCache.CacheEntry getRetryEntry(RaftServerImpl server, 
ClientId clientId, long callId) {
-    return server.getRetryCache().get(ClientInvocationId.valueOf(clientId, 
callId));
+  public static long getRetryCacheSize(RaftServer.Division server) {
+    return ((RaftServerImpl)server).getRetryCache().size();
+  }
+
+  public static RetryCache.CacheEntry getRetryEntry(RaftServer.Division 
server, ClientId clientId, long callId) {
+    return 
((RaftServerImpl)server).getRetryCache().get(ClientInvocationId.valueOf(clientId,
 callId));
   }
 
   public static boolean isRetryCacheEntryFailed(RetryCache.CacheEntry entry) {
@@ -103,11 +123,31 @@ public class RaftServerTestUtil {
     return server.getRole().getRaftPeerRole();
   }
 
-  private static Optional<LeaderState> getLeaderState(RaftServerImpl server) {
-    return server.getRole().getLeaderState();
+  public static RaftConfiguration getRaftConf(RaftServer.Division server) {
+    return ((RaftServerImpl)server).getRaftConf();
   }
 
-  public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) {
+  public static RaftLog getRaftLog(RaftServer.Division server) {
+    return ((RaftServerImpl)server).getState().getLog();
+  }
+
+  public static RaftStorage getRaftStorage(RaftServer.Division server) {
+    return ((RaftServerImpl)server).getState().getStorage();
+  }
+
+  public static RaftServerMetrics getRaftServerMetrics(RaftServer.Division 
server) {
+    return ((RaftServerImpl)server).getRaftServerMetrics();
+  }
+
+  public static RaftServerRpc getServerRpc(RaftServer.Division server) {
+    return ((RaftServerImpl)server).getRaftServer().getServerRpc();
+  }
+
+  private static Optional<LeaderState> getLeaderState(RaftServer.Division 
server) {
+    return ((RaftServerImpl)server).getRole().getLeaderState();
+  }
+
+  public static Stream<LogAppender> getLogAppenders(RaftServer.Division 
server) {
     return 
getLeaderState(server).map(LeaderState::getLogAppenders).orElse(null);
   }
 
@@ -117,10 +157,6 @@ public class RaftServerTestUtil {
     leaderState.getLogAppenders().forEach(leaderState::restartSender);
   }
 
-  public static Logger getStateMachineUpdaterLog() {
-    return StateMachineUpdater.LOG;
-  }
-
   public static List<RaftServerImpl> getRaftServerImpls(RaftServerProxy proxy) 
{
     return JavaUtils.callAsUnchecked(proxy::getImpls);
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 2487589..d8e7c0b 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -77,7 +77,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   private static final boolean 
RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT = false;
   private boolean notifiedAsLeader;
 
-  public static SimpleStateMachine4Testing get(RaftServerImpl s) {
+  public static SimpleStateMachine4Testing get(RaftServer.Division s) {
     return (SimpleStateMachine4Testing)s.getStateMachine();
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 46eeed1..c7ff300 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -73,11 +73,13 @@ import java.util.stream.Collectors;
 
 abstract class DataStreamBaseTest extends BaseTest {
   static class MyDivision implements RaftServer.Division {
+    private final RaftServer server;
     private final MultiDataStreamStateMachine stateMachine = new 
MultiDataStreamStateMachine();
     private final DataStreamMap streamMap;
 
-    MyDivision(Object name) {
-      this.streamMap = RaftServerTestUtil.newDataStreamMap(name);
+    MyDivision(RaftServer server) {
+      this.server = server;
+      this.streamMap = RaftServerTestUtil.newDataStreamMap(server.getId());
     }
 
     @Override
@@ -91,6 +93,11 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
 
     @Override
+    public RaftServer getRaftServer() {
+      return server;
+    }
+
+    @Override
     public MultiDataStreamStateMachine getStateMachine() {
       return stateMachine;
     }
@@ -143,27 +150,32 @@ abstract class DataStreamBaseTest extends BaseTest {
   }
 
   protected MyRaftServer newRaftServer(RaftPeer peer, RaftProperties 
properties) {
-    return new MyRaftServer(peer.getId(), properties);
+    return new MyRaftServer(peer, properties);
   }
 
   static class MyRaftServer implements RaftServer {
-      private final RaftPeerId id;
+      private final RaftPeer peer;
       private final RaftProperties properties;
       private final ConcurrentMap<RaftGroupId, MyDivision> divisions = new 
ConcurrentHashMap<>();
 
-      MyRaftServer(RaftPeerId id, RaftProperties properties) {
-        this.id = id;
+      MyRaftServer(RaftPeer peer, RaftProperties properties) {
+        this.peer = peer;
         this.properties = properties;
       }
 
       @Override
       public RaftPeerId getId() {
-        return id;
+        return peer.getId();
+      }
+
+      @Override
+      public RaftPeer getPeer() {
+        return peer;
       }
 
       @Override
       public MyDivision getDivision(RaftGroupId groupId) {
-        return divisions.computeIfAbsent(groupId, MyDivision::new);
+        return divisions.computeIfAbsent(groupId, key -> new MyDivision(this));
       }
 
       @Override
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index 16426a7..3ee024b 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -120,7 +120,8 @@ public class TestNettyDataStreamWithMock extends 
DataStreamBaseTest {
       if (getStateMachineException == null) {
         final ConcurrentMap<RaftGroupId, MyDivision> divisions = new 
ConcurrentHashMap<>();
         
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenAnswer(
-            invocation -> 
divisions.computeIfAbsent((RaftGroupId)invocation.getArguments()[0], 
MyDivision::new));
+            invocation -> 
divisions.computeIfAbsent((RaftGroupId)invocation.getArguments()[0],
+                key -> new MyDivision(raftServer)));
       } else {
         
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
       }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
index be1e3ac..ff14230 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
@@ -25,7 +25,8 @@ import org.apache.ratis.grpc.server.GrpcService;
 import org.apache.ratis.metrics.JVMMetrics;
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
@@ -62,9 +63,9 @@ public class TestGrpcMessageMetrics extends BaseTest
     JavaUtils.attempt(() -> assertMessageCount(cluster.getLeader()), 100, 
HUNDRED_MILLIS, cluster.getLeader().getId() + "-assertMessageCount", null);
   }
 
-  static void assertMessageCount(RaftServerImpl server) throws  Exception {
+  static void assertMessageCount(RaftServer.Division server) {
     String serverId = server.getId().toString();
-    GrpcService service = (GrpcService)(server.getProxy().getServerRpc());
+    GrpcService service = (GrpcService) 
RaftServerTestUtil.getServerRpc(server);
     RatisMetricRegistry registry = 
service.getServerInterceptor().getMetrics().getRegistry();
     String counter_prefix = serverId + "_" + 
"ratis.grpc.RaftServerProtocolService";
     Assert.assertTrue(registry.counter(counter_prefix + "_" + "requestVote" + 
"_OK_completed_total").getCount() > 0);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
index 192b25c..f6e8cfc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
@@ -25,9 +25,8 @@ import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
@@ -51,9 +50,9 @@ public class TestRetryCacheWithGrpc
     MiniRaftClusterWithGrpc cluster = getFactory().newCluster(NUM_SERVERS, 
properties);
     cluster.start();
 
-    RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
-    RaftServerProxy leaderProxy = leader.getProxy();
-    for (RaftServerImpl follower : cluster.getFollowers()) {
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    final RaftServer leaderProxy = leader.getRaftServer();
+    for (RaftServer.Division follower : cluster.getFollowers()) {
       // block followers to trigger ResourceUnavailableException
       ((SimpleStateMachine4Testing) 
follower.getStateMachine()).blockWriteStateMachineData();
     }
@@ -74,7 +73,7 @@ public class TestRetryCacheWithGrpc
         return null;
       });
     }
-    for (RaftServerImpl follower : cluster.getFollowers()) {
+    for (RaftServer.Division follower : cluster.getFollowers()) {
       // unblock followers
       
((SimpleStateMachine4Testing)follower.getStateMachine()).unblockWriteStateMachineData();
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java 
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 4814d25..e1ba5b3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -31,7 +31,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.impl.ServerState;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -109,7 +109,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     writeSomething(newMessage, cluster);
     final int truncatedMessageIndex = messageCount.get() - 1;
 
-    final long leaderLastIndex = 
cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex();
+    final long leaderLastIndex = 
RaftServerTestUtil.getRaftLog(cluster.getLeader()).getLastEntryTermIndex().getIndex();
     // make sure the restarted follower can catchup
     final ServerState followerState = 
cluster.getRaftServerImpl(followerId).getState();
     JavaUtils.attemptRepeatedly(() -> {
@@ -127,7 +127,9 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     final File leaderOpenLogFile = 
getOpenLogFile(cluster.getRaftServerImpl(leaderId));
 
     // shutdown all servers
-    cluster.getServers().forEach(RaftServerProxy::close);
+    for(RaftServer s : cluster.getServers()) {
+      s.close();
+    }
 
     // truncate log and
     assertTruncatedLog(followerId, followerOpenLogFile, followerLastIndex, 
cluster);
@@ -166,7 +168,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     final RaftServerImpl server = cluster.restartServer(id, false);
     // the last index should be one less than before
     Assert.assertEquals(lastIndex - 1, 
server.getState().getLog().getLastEntryTermIndex().getIndex());
-    server.getProxy().close();
+    server.getRaftServer().close();
   }
 
   static List<Path> getOpenLogFiles(RaftServerImpl server) throws Exception {
@@ -195,7 +197,9 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     }
 
     // shutdown all servers
-    cluster.getServers().forEach(RaftServerProxy::close);
+    for(RaftServer s : cluster.getServers()) {
+      s.close();
+    }
 
     for(RaftServerImpl impl : cluster.iterateServerImpls()) {
       final File openLogFile = JavaUtils.attemptRepeatedly(() -> 
getOpenLogFile(impl),
@@ -221,7 +225,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
       });
     }
     final RaftServerImpl server = cluster.restartServer(id, false);
-    server.getProxy().close();
+    server.getRaftServer().close();
   }
 
   @Test
@@ -249,8 +253,8 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     JavaUtils.allOf(futures).get();
 
     final List<RaftPeerId> ids = new ArrayList<>();
-    final RaftServerImpl leader = cluster.getLeader();
-    final RaftLog leaderLog = leader.getState().getLog();
+    final RaftServer.Division leader = cluster.getLeader();
+    final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
     final RaftPeerId leaderId = leader.getId();
     ids.add(leaderId);
 
@@ -306,8 +310,8 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     }
   }
 
-  static void assertLastLogEntry(RaftServerImpl server) throws 
RaftLogIOException {
-    final RaftLog raftLog = server.getState().getLog();
+  static void assertLastLogEntry(RaftServer.Division server) throws 
RaftLogIOException {
+    final RaftLog raftLog = RaftServerTestUtil.getRaftLog(server);
     final long lastIndex = raftLog.getLastEntryTermIndex().getIndex();
     final LogEntryProto lastEntry = raftLog.get(lastIndex);
     Assert.assertTrue(lastEntry.hasMetadataEntry());
@@ -365,7 +369,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
 
     final RaftLog log = leader.getState().getLog();
     final long size = TestSegmentedRaftLog.getOpenSegmentSize(log);
-    leader.getProxy().close();
+    leader.getRaftServer().close();
 
     // corrupt the log
     final File openLogFile = JavaUtils.attemptRepeatedly(() -> 
getOpenLogFile(leader),
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java 
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index 027f1fc..77b5cd2 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.statemachine;
 
+import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
@@ -48,7 +49,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 import static org.junit.Assert.*;
 
@@ -64,7 +64,7 @@ public class TestStateMachine extends BaseTest implements 
MiniRaftClusterWithSim
   public static final int NUM_SERVERS = 3;
 
   static class SMTransactionContext extends SimpleStateMachine4Testing {
-    public static SMTransactionContext get(RaftServerImpl s) {
+    public static SMTransactionContext get(RaftServer.Division s) {
       return (SMTransactionContext)s.getStateMachine();
     }
 
@@ -159,10 +159,10 @@ public class TestStateMachine extends BaseTest implements 
MiniRaftClusterWithSim
     }
 
     // check leader
-    RaftServerImpl raftServer = cluster.getLeader();
+    RaftServer.Division raftServer = cluster.getLeader();
     // assert every transaction has obtained context in leader
     final SMTransactionContext sm = SMTransactionContext.get(raftServer);
-    List<Long> ll = sm.applied.stream().collect(Collectors.toList());
+    final List<Long> ll = new ArrayList<>(sm.applied);
     Collections.sort(ll);
     assertEquals(ll.toString(), ll.size(), numTrx);
     for (int i=0; i < numTrx; i++) {

Reply via email to