This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e61333d RATIS-1183. Add getId() and getPeer() to
RaftServer/RaftServer.Division. (#302)
e61333d is described below
commit e61333d16fe50de0cef4c9fe603f7776cf3e7e9c
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Nov 27 07:47:11 2020 +0800
RATIS-1183. Add getId() and getPeer() to RaftServer/RaftServer.Division.
(#302)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 16 +++---
.../java/org/apache/ratis/server/RaftServer.java | 23 ++++++++-
.../apache/ratis/server/impl/LeaderElection.java | 2 +-
.../org/apache/ratis/server/impl/LeaderState.java | 3 +-
.../org/apache/ratis/server/impl/LogAppender.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 16 ++----
.../apache/ratis/server/impl/RaftServerProxy.java | 15 ++++++
.../org/apache/ratis/server/impl/RoleInfo.java | 5 +-
.../org/apache/ratis/server/impl/ServerState.java | 2 +-
.../java/org/apache/ratis/MiniRaftCluster.java | 23 ++-------
.../test/java/org/apache/ratis/RaftBasicTests.java | 2 +-
.../ratis/server/impl/RaftServerTestUtil.java | 60 +++++++++++++++++-----
.../statemachine/SimpleStateMachine4Testing.java | 2 +-
.../ratis/datastream/DataStreamBaseTest.java | 28 +++++++---
.../datastream/TestNettyDataStreamWithMock.java | 3 +-
.../apache/ratis/grpc/TestGrpcMessageMetrics.java | 7 +--
.../apache/ratis/grpc/TestRetryCacheWithGrpc.java | 11 ++--
.../apache/ratis/server/ServerRestartTests.java | 26 ++++++----
.../ratis/statemachine/TestStateMachine.java | 8 +--
19 files changed, 161 insertions(+), 93 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 23e55c2..c1d4515 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.grpc.server;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
@@ -69,16 +70,15 @@ public class GrpcLogAppender extends LogAppender {
FollowerInfo f) {
super(server, leaderState, f);
- this.rpcService = (GrpcService) server.getServerRpc();
+ this.rpcService = (GrpcService) server.getRaftServer().getServerRpc();
+
+ final RaftProperties properties = server.getRaftServer().getProperties();
+ this.maxPendingRequestsNum =
GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
+ this.requestTimeoutDuration =
RaftServerConfigKeys.Rpc.requestTimeout(properties);
+ this.installSnapshotEnabled =
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
- maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(
- server.getProxy().getProperties());
- requestTimeoutDuration =
RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
- installSnapshotEnabled =
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(
- server.getProxy().getProperties());
grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
- grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(),
- () -> pendingRequests.logRequestsSize());
+ grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(),
pendingRequests::logRequestsSize);
}
private GrpcServerProtocolClient getClient() throws IOException {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index fc8b8ab..6af1027 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -20,8 +20,8 @@ package org.apache.ratis.server;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.rpc.RpcFactory;
import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
+import java.util.Optional;
/** Raft server interface */
public interface RaftServer extends Closeable, RpcType.Get,
@@ -48,9 +49,24 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the {@link RaftGroupMemberId} for this division. */
RaftGroupMemberId getMemberId();
+ /** @return the {@link RaftPeerId} for this division. */
+ default RaftPeerId getId() {
+ return getMemberId().getPeerId();
+ }
+
+ /** @return the {@link RaftPeer} for this division. */
+ default RaftPeer getPeer() {
+ return Optional.ofNullable(getGroup())
+ .map(g -> g.getPeer(getId()))
+ .orElseGet(() -> getRaftServer().getPeer());
+ }
+
/** @return the {@link RaftGroup} for this division. */
RaftGroup getGroup();
+ /** @return the {@link RaftServer} containing this division. */
+ RaftServer getRaftServer();
+
/** @return the {@link StateMachine} for this division. */
StateMachine getStateMachine();
@@ -60,6 +76,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the server ID. */
RaftPeerId getId();
+ /** @return the {@link RaftPeer} for this server. */
+ RaftPeer getPeer();
+
/** @return the group IDs the server is part of. */
Iterable<RaftGroupId> getGroupIds();
@@ -72,7 +91,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
RaftProperties getProperties();
/** @return the factory for creating server components. */
- ServerFactory getFactory();
+ RpcFactory getFactory();
/** Start this server. */
void start() throws IOException;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 7f71019..dbbefa8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -241,7 +241,7 @@ class LeaderElection implements Runnable {
return;
case SHUTDOWN:
LOG.info("{} received shutdown response when requesting votes.",
this);
- server.getProxy().close();
+ server.getRaftServer().close();
return;
case REJECTED:
case DISCOVERED_A_NEW_TERM:
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index eda0a43..e161d6f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -236,10 +236,11 @@ public class LeaderState {
private final RaftServerMetrics raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
- LeaderState(RaftServerImpl server, RaftProperties properties) {
+ LeaderState(RaftServerImpl server) {
this.name = server.getMemberId() + "-" +
JavaUtils.getClassSimpleName(getClass());
this.server = server;
+ final RaftProperties properties = server.getRaftServer().getProperties();
stagingCatchupGap = RaftServerConfigKeys.stagingCatchupGap(properties);
syncInterval = RaftServerConfigKeys.Rpc.sleepTime(properties);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index a2f1624..6c1f80e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -155,7 +155,7 @@ public class LogAppender {
this.leaderState = leaderState;
this.raftLog = server.getState().getLog();
- final RaftProperties properties = server.getProxy().getProperties();
+ final RaftProperties properties = server.getRaftServer().getProperties();
this.snapshotChunkMaxSize =
RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
this.halfMinTimeoutMs = server.getMinTimeoutMs() / 2;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 02fc88e..691ed6e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -95,8 +95,6 @@ public class RaftServerImpl implements RaftServer.Division,
private final LifeCycle lifeCycle;
private final ServerState state;
- private final Supplier<RaftPeer> peerSupplier = JavaUtils.memoize(() ->
-
RaftPeer.newBuilder().setId(getId()).setAddress(getServerRpc().getInetSocketAddress()).build());
private final RoleInfo role;
private final DataStreamMap dataStreamMap;
@@ -152,13 +150,8 @@ public class RaftServerImpl implements RaftServer.Division,
return new RetryCache(expireTime);
}
- LogAppender newLogAppender(
- LeaderState leaderState, FollowerInfo f) {
- return getProxy().getFactory().newLogAppender(this, leaderState, f);
- }
-
- RaftPeer getPeer() {
- return peerSupplier.get();
+ LogAppender newLogAppender(LeaderState leaderState, FollowerInfo f) {
+ return getRaftServer().getFactory().newLogAppender(this, leaderState, f);
}
int getMinTimeoutMs() {
@@ -200,7 +193,8 @@ public class RaftServerImpl implements RaftServer.Division,
return retryCache;
}
- public RaftServerProxy getProxy() {
+ @Override
+ public RaftServerProxy getRaftServer() {
return proxy;
}
@@ -434,7 +428,7 @@ public class RaftServerImpl implements RaftServer.Division,
state.becomeLeader();
// start sending AppendEntries RPC to followers
- final LogEntryProto e = role.startLeaderState(this,
getProxy().getProperties());
+ final LogEntryProto e = role.startLeaderState(this);
getState().setRaftConf(e);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 0247211..f1dbe2f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -58,6 +58,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.*;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -155,6 +156,7 @@ public class RaftServerProxy implements RaftServer {
}
private final RaftPeerId id;
+ private final Supplier<RaftPeer> peerSupplier =
JavaUtils.memoize(this::buildRaftPeer);
private final RaftProperties properties;
private final StateMachine.Registry stateMachineRegistry;
private final LifeCycle lifeCycle;
@@ -238,6 +240,19 @@ public class RaftServerProxy implements RaftServer {
}
@Override
+ public RaftPeer getPeer() {
+ return peerSupplier.get();
+ }
+
+ private RaftPeer buildRaftPeer() {
+ return RaftPeer.newBuilder()
+ .setId(getId())
+ .setAddress(getServerRpc().getInetSocketAddress())
+ .setDataStreamAddress(getDataStreamServerRpc().getInetSocketAddress())
+ .build();
+ }
+
+ @Override
public List<RaftGroupId> getGroupIds() {
return impls.getGroupIds();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 1fbd6f4..1f057dd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -18,7 +18,6 @@
package org.apache.ratis.server.impl;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftPeerId;
@@ -90,8 +89,8 @@ class RoleInfo {
return Objects.requireNonNull(leaderState.get(), "leaderState is null");
}
- LogEntryProto startLeaderState(RaftServerImpl server, RaftProperties
properties) {
- return updateAndGet(leaderState, new LeaderState(server,
properties)).start();
+ LogEntryProto startLeaderState(RaftServerImpl server) {
+ return updateAndGet(leaderState, new LeaderState(server)).start();
}
void shutdownLeaderState(boolean allowNull) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 708bd2b..e2a495f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -158,7 +158,7 @@ public class ServerState implements Closeable {
private long initStatemachine(StateMachine sm, RaftGroupId gid)
throws IOException {
- sm.initialize(server.getProxy(), gid, storage);
+ sm.initialize(server.getRaftServer(), gid, storage);
SnapshotInfo snapshot = sm.getLatestSnapshot();
if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 395ad10..c136add 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -296,7 +296,7 @@ public abstract class MiniRaftCluster implements Closeable {
public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean
format) {
final RaftServerProxy s = newRaftServer(id, group, format);
Preconditions.assertTrue(servers.put(id, s) == null);
- peers.put(id, toRaftPeer(s));
+ peers.put(id, s.getPeer());
return s;
}
@@ -404,25 +404,12 @@ public abstract class MiniRaftCluster implements
Closeable {
};
}
- public static List<RaftPeer> toRaftPeers(
- Collection<RaftServerProxy> servers) {
+ private static List<RaftPeer> toRaftPeers(Collection<RaftServerProxy>
servers) {
return servers.stream()
- .map(MiniRaftCluster::toRaftPeer)
+ .map(RaftServer::getPeer)
.collect(Collectors.toList());
}
- public static RaftPeer toRaftPeer(RaftServerImpl s) {
- return toRaftPeer(s.getProxy());
- }
-
- public static RaftPeer toRaftPeer(RaftServerProxy s) {
- return RaftPeer.newBuilder()
- .setId(s.getId())
- .setAddress(s.getServerRpc().getInetSocketAddress())
-
.setDataStreamAddress(s.getDataStreamServerRpc().getInetSocketAddress())
- .build();
- }
-
public PeerChanges addNewPeers(int number, boolean startNewPeer)
throws IOException {
return addNewPeers(generateIds(number, servers.size()), startNewPeer);
@@ -463,7 +450,7 @@ public abstract class MiniRaftCluster implements Closeable {
Collection<RaftPeer> peers = new ArrayList<>(group.getPeers());
List<RaftPeer> removedPeers = new ArrayList<>(number);
if (removeLeader) {
- final RaftPeer leader = toRaftPeer(RaftTestUtil.waitForLeader(this));
+ final RaftPeer leader = RaftTestUtil.waitForLeader(this).getPeer();
Preconditions.assertTrue(!excluded.contains(leader));
peers.remove(leader);
removedPeers.add(leader);
@@ -471,7 +458,7 @@ public abstract class MiniRaftCluster implements Closeable {
List<RaftServerImpl> followers = getFollowers();
for (int i = 0, removed = 0; i < followers.size() &&
removed < (removeLeader ? number - 1 : number); i++) {
- RaftPeer toRemove = toRaftPeer(followers.get(i));
+ RaftPeer toRemove = followers.get(i).getPeer();
if (!excluded.contains(toRemove)) {
peers.remove(toRemove);
removedPeers.add(toRemove);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 1fcc583..3486f90 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -71,7 +71,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
implements MiniRaftCluster.Factory.Get<CLUSTER> {
{
Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
- Log4jUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(),
Level.DEBUG);
+ RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG);
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(),
TimeDuration.valueOf(5, TimeUnit.SECONDS));
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 440b3bb..56b7e60 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -26,6 +26,10 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.TimeDuration;
@@ -42,6 +46,9 @@ import java.util.stream.Stream;
public class RaftServerTestUtil {
static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class);
+ public static void setStateMachineUpdaterLogLevel(Level level) {
+ Log4jUtils.setLogLevel(StateMachineUpdater.LOG, level);
+ }
public static void setWatchRequestsLogLevel(Level level) {
Log4jUtils.setLogLevel(WatchRequests.LOG, level);
}
@@ -65,7 +72,8 @@ public class RaftServerTestUtil {
int deadIncluded = 0;
final RaftConfiguration current = RaftConfiguration.newBuilder()
.setConf(peers).setLogEntryIndex(0).build();
- for (RaftServerImpl server : cluster.iterateServerImpls()) {
+ for (RaftServer.Division d : cluster.iterateServerImpls()) {
+ final RaftServerImpl server = (RaftServerImpl)d;
LOG.info("checking {}", server);
if (deadPeers != null && deadPeers.contains(server.getId())) {
if (current.containsInConf(server.getId())) {
@@ -87,12 +95,24 @@ public class RaftServerTestUtil {
Assert.assertEquals(peers.size(), numIncluded + deadIncluded);
}
- public static long getRetryCacheSize(RaftServerImpl server) {
- return server.getRetryCache().size();
+ public static boolean isLeaderReady(RaftServer.Division server) {
+ return ((RaftServerImpl)server).isLeaderReady();
+ }
+
+ public static long getCurrentTerm(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getState().getCurrentTerm();
+ }
+
+ public static long getLastAppliedIndex(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getState().getLastAppliedIndex();
}
- public static RetryCache.CacheEntry getRetryEntry(RaftServerImpl server,
ClientId clientId, long callId) {
- return server.getRetryCache().get(ClientInvocationId.valueOf(clientId,
callId));
+ public static long getRetryCacheSize(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getRetryCache().size();
+ }
+
+ public static RetryCache.CacheEntry getRetryEntry(RaftServer.Division
server, ClientId clientId, long callId) {
+ return
((RaftServerImpl)server).getRetryCache().get(ClientInvocationId.valueOf(clientId,
callId));
}
public static boolean isRetryCacheEntryFailed(RetryCache.CacheEntry entry) {
@@ -103,11 +123,31 @@ public class RaftServerTestUtil {
return server.getRole().getRaftPeerRole();
}
- private static Optional<LeaderState> getLeaderState(RaftServerImpl server) {
- return server.getRole().getLeaderState();
+ public static RaftConfiguration getRaftConf(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getRaftConf();
}
- public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) {
+ public static RaftLog getRaftLog(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getState().getLog();
+ }
+
+ public static RaftStorage getRaftStorage(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getState().getStorage();
+ }
+
+ public static RaftServerMetrics getRaftServerMetrics(RaftServer.Division
server) {
+ return ((RaftServerImpl)server).getRaftServerMetrics();
+ }
+
+ public static RaftServerRpc getServerRpc(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getRaftServer().getServerRpc();
+ }
+
+ private static Optional<LeaderState> getLeaderState(RaftServer.Division
server) {
+ return ((RaftServerImpl)server).getRole().getLeaderState();
+ }
+
+ public static Stream<LogAppender> getLogAppenders(RaftServer.Division
server) {
return
getLeaderState(server).map(LeaderState::getLogAppenders).orElse(null);
}
@@ -117,10 +157,6 @@ public class RaftServerTestUtil {
leaderState.getLogAppenders().forEach(leaderState::restartSender);
}
- public static Logger getStateMachineUpdaterLog() {
- return StateMachineUpdater.LOG;
- }
-
public static List<RaftServerImpl> getRaftServerImpls(RaftServerProxy proxy)
{
return JavaUtils.callAsUnchecked(proxy::getImpls);
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 2487589..d8e7c0b 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -77,7 +77,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
private static final boolean
RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT = false;
private boolean notifiedAsLeader;
- public static SimpleStateMachine4Testing get(RaftServerImpl s) {
+ public static SimpleStateMachine4Testing get(RaftServer.Division s) {
return (SimpleStateMachine4Testing)s.getStateMachine();
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 46eeed1..c7ff300 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -73,11 +73,13 @@ import java.util.stream.Collectors;
abstract class DataStreamBaseTest extends BaseTest {
static class MyDivision implements RaftServer.Division {
+ private final RaftServer server;
private final MultiDataStreamStateMachine stateMachine = new
MultiDataStreamStateMachine();
private final DataStreamMap streamMap;
- MyDivision(Object name) {
- this.streamMap = RaftServerTestUtil.newDataStreamMap(name);
+ MyDivision(RaftServer server) {
+ this.server = server;
+ this.streamMap = RaftServerTestUtil.newDataStreamMap(server.getId());
}
@Override
@@ -91,6 +93,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public RaftServer getRaftServer() {
+ return server;
+ }
+
+ @Override
public MultiDataStreamStateMachine getStateMachine() {
return stateMachine;
}
@@ -143,27 +150,32 @@ abstract class DataStreamBaseTest extends BaseTest {
}
protected MyRaftServer newRaftServer(RaftPeer peer, RaftProperties
properties) {
- return new MyRaftServer(peer.getId(), properties);
+ return new MyRaftServer(peer, properties);
}
static class MyRaftServer implements RaftServer {
- private final RaftPeerId id;
+ private final RaftPeer peer;
private final RaftProperties properties;
private final ConcurrentMap<RaftGroupId, MyDivision> divisions = new
ConcurrentHashMap<>();
- MyRaftServer(RaftPeerId id, RaftProperties properties) {
- this.id = id;
+ MyRaftServer(RaftPeer peer, RaftProperties properties) {
+ this.peer = peer;
this.properties = properties;
}
@Override
public RaftPeerId getId() {
- return id;
+ return peer.getId();
+ }
+
+ @Override
+ public RaftPeer getPeer() {
+ return peer;
}
@Override
public MyDivision getDivision(RaftGroupId groupId) {
- return divisions.computeIfAbsent(groupId, MyDivision::new);
+ return divisions.computeIfAbsent(groupId, key -> new MyDivision(this));
}
@Override
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index 16426a7..3ee024b 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -120,7 +120,8 @@ public class TestNettyDataStreamWithMock extends
DataStreamBaseTest {
if (getStateMachineException == null) {
final ConcurrentMap<RaftGroupId, MyDivision> divisions = new
ConcurrentHashMap<>();
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenAnswer(
- invocation ->
divisions.computeIfAbsent((RaftGroupId)invocation.getArguments()[0],
MyDivision::new));
+ invocation ->
divisions.computeIfAbsent((RaftGroupId)invocation.getArguments()[0],
+ key -> new MyDivision(raftServer)));
} else {
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
index be1e3ac..ff14230 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
@@ -25,7 +25,8 @@ import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.metrics.JVMMetrics;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
@@ -62,9 +63,9 @@ public class TestGrpcMessageMetrics extends BaseTest
JavaUtils.attempt(() -> assertMessageCount(cluster.getLeader()), 100,
HUNDRED_MILLIS, cluster.getLeader().getId() + "-assertMessageCount", null);
}
- static void assertMessageCount(RaftServerImpl server) throws Exception {
+ static void assertMessageCount(RaftServer.Division server) {
String serverId = server.getId().toString();
- GrpcService service = (GrpcService)(server.getProxy().getServerRpc());
+ GrpcService service = (GrpcService)
RaftServerTestUtil.getServerRpc(server);
RatisMetricRegistry registry =
service.getServerInterceptor().getMetrics().getRegistry();
String counter_prefix = serverId + "_" +
"ratis.grpc.RaftServerProtocolService";
Assert.assertTrue(registry.counter(counter_prefix + "_" + "requestVote" +
"_OK_completed_total").getCount() > 0);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
index 192b25c..f6e8cfc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
@@ -25,9 +25,8 @@ import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -51,9 +50,9 @@ public class TestRetryCacheWithGrpc
MiniRaftClusterWithGrpc cluster = getFactory().newCluster(NUM_SERVERS,
properties);
cluster.start();
- RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
- RaftServerProxy leaderProxy = leader.getProxy();
- for (RaftServerImpl follower : cluster.getFollowers()) {
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer leaderProxy = leader.getRaftServer();
+ for (RaftServer.Division follower : cluster.getFollowers()) {
// block followers to trigger ResourceUnavailableException
((SimpleStateMachine4Testing)
follower.getStateMachine()).blockWriteStateMachineData();
}
@@ -74,7 +73,7 @@ public class TestRetryCacheWithGrpc
return null;
});
}
- for (RaftServerImpl follower : cluster.getFollowers()) {
+ for (RaftServer.Division follower : cluster.getFollowers()) {
// unblock followers
((SimpleStateMachine4Testing)follower.getStateMachine()).unblockWriteStateMachineData();
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 4814d25..e1ba5b3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -31,7 +31,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.impl.ServerState;
import org.apache.ratis.server.protocol.TermIndex;
@@ -109,7 +109,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
writeSomething(newMessage, cluster);
final int truncatedMessageIndex = messageCount.get() - 1;
- final long leaderLastIndex =
cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex();
+ final long leaderLastIndex =
RaftServerTestUtil.getRaftLog(cluster.getLeader()).getLastEntryTermIndex().getIndex();
// make sure the restarted follower can catchup
final ServerState followerState =
cluster.getRaftServerImpl(followerId).getState();
JavaUtils.attemptRepeatedly(() -> {
@@ -127,7 +127,9 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
final File leaderOpenLogFile =
getOpenLogFile(cluster.getRaftServerImpl(leaderId));
// shutdown all servers
- cluster.getServers().forEach(RaftServerProxy::close);
+ for(RaftServer s : cluster.getServers()) {
+ s.close();
+ }
// truncate log and
assertTruncatedLog(followerId, followerOpenLogFile, followerLastIndex,
cluster);
@@ -166,7 +168,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
final RaftServerImpl server = cluster.restartServer(id, false);
// the last index should be one less than before
Assert.assertEquals(lastIndex - 1,
server.getState().getLog().getLastEntryTermIndex().getIndex());
- server.getProxy().close();
+ server.getRaftServer().close();
}
static List<Path> getOpenLogFiles(RaftServerImpl server) throws Exception {
@@ -195,7 +197,9 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
}
// shutdown all servers
- cluster.getServers().forEach(RaftServerProxy::close);
+ for(RaftServer s : cluster.getServers()) {
+ s.close();
+ }
for(RaftServerImpl impl : cluster.iterateServerImpls()) {
final File openLogFile = JavaUtils.attemptRepeatedly(() ->
getOpenLogFile(impl),
@@ -221,7 +225,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
});
}
final RaftServerImpl server = cluster.restartServer(id, false);
- server.getProxy().close();
+ server.getRaftServer().close();
}
@Test
@@ -249,8 +253,8 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
JavaUtils.allOf(futures).get();
final List<RaftPeerId> ids = new ArrayList<>();
- final RaftServerImpl leader = cluster.getLeader();
- final RaftLog leaderLog = leader.getState().getLog();
+ final RaftServer.Division leader = cluster.getLeader();
+ final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
final RaftPeerId leaderId = leader.getId();
ids.add(leaderId);
@@ -306,8 +310,8 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
}
}
- static void assertLastLogEntry(RaftServerImpl server) throws
RaftLogIOException {
- final RaftLog raftLog = server.getState().getLog();
+ static void assertLastLogEntry(RaftServer.Division server) throws
RaftLogIOException {
+ final RaftLog raftLog = RaftServerTestUtil.getRaftLog(server);
final long lastIndex = raftLog.getLastEntryTermIndex().getIndex();
final LogEntryProto lastEntry = raftLog.get(lastIndex);
Assert.assertTrue(lastEntry.hasMetadataEntry());
@@ -365,7 +369,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
final RaftLog log = leader.getState().getLog();
final long size = TestSegmentedRaftLog.getOpenSegmentSize(log);
- leader.getProxy().close();
+ leader.getRaftServer().close();
// corrupt the log
final File openLogFile = JavaUtils.attemptRepeatedly(() ->
getOpenLogFile(leader),
diff --git
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index 027f1fc..77b5cd2 100644
---
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.statemachine;
+import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
@@ -48,7 +49,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import static org.junit.Assert.*;
@@ -64,7 +64,7 @@ public class TestStateMachine extends BaseTest implements
MiniRaftClusterWithSim
public static final int NUM_SERVERS = 3;
static class SMTransactionContext extends SimpleStateMachine4Testing {
- public static SMTransactionContext get(RaftServerImpl s) {
+ public static SMTransactionContext get(RaftServer.Division s) {
return (SMTransactionContext)s.getStateMachine();
}
@@ -159,10 +159,10 @@ public class TestStateMachine extends BaseTest implements
MiniRaftClusterWithSim
}
// check leader
- RaftServerImpl raftServer = cluster.getLeader();
+ RaftServer.Division raftServer = cluster.getLeader();
// assert every transaction has obtained context in leader
final SMTransactionContext sm = SMTransactionContext.get(raftServer);
- List<Long> ll = sm.applied.stream().collect(Collectors.toList());
+ final List<Long> ll = new ArrayList<>(sm.applied);
Collections.sort(ll);
assertEquals(ll.toString(), ll.size(), numTrx);
for (int i=0; i < numTrx; i++) {