This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f0146f8f RATIS-1605. Do not start RaftServer in
MiniRaftCluster#addNewPeers. (#663)
f0146f8f is described below
commit f0146f8f0ab192c96079dfbb2fdbee7e7488c361
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Jul 15 10:19:58 2022 +0800
RATIS-1605. Do not start RaftServer in MiniRaftCluster#addNewPeers. (#663)
---
.../grpc/client/GrpcClientProtocolClient.java | 2 +-
.../org/apache/ratis/grpc/server/GrpcService.java | 20 +++----
.../apache/ratis/netty/server/NettyRpcService.java | 12 ++++-
.../apache/ratis/server/impl/LeaderStateImpl.java | 3 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../apache/ratis/server/raftlog/LogProtoUtils.java | 4 ++
.../apache/ratis/server/impl/MiniRaftCluster.java | 9 ++--
.../server/impl/RaftReconfigurationBaseTest.java | 28 ++++++----
.../ratis/server/impl/RaftServerTestUtil.java | 63 ++++++++++++++++++++++
.../apache/ratis/server/ServerRestartTests.java | 4 +-
10 files changed, 115 insertions(+), 32 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 5ca9a408..c2ee1f24 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -382,7 +382,7 @@ public class GrpcClientProtocolClient implements Closeable {
private void timeoutCheck(long callId, TimeDuration timeOutDuration) {
handleReplyFuture(callId, f -> f.completeExceptionally(
- new TimeoutIOException("Request #" + callId + " timeout " +
timeOutDuration)));
+ new TimeoutIOException(getName() + " request #" + callId + " timeout
" + timeOutDuration)));
}
private void handleReplyFuture(long callId,
Consumer<CompletableFuture<RaftClientReply>> handler) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 3ce4fc51..877ed85e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
@@ -103,7 +103,7 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
return new Builder();
}
- private final List<Server> servers = new ArrayList<>(3);
+ private final Map<String, Server> servers = new HashMap<>();
private final Supplier<InetSocketAddress> addressSupplier;
private final Supplier<InetSocketAddress> clientServerAddressSupplier;
private final Supplier<InetSocketAddress> adminServerAddressSupplier;
@@ -172,7 +172,7 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
}
final Server server = serverBuilder.build();
- servers.add(server);
+ servers.put(GrpcServerProtocolService.class.getSimpleName(), server);
addressSupplier = newAddressSupplier(serverPort, server);
if (separateAdminServer) {
@@ -180,7 +180,7 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
startBuildingNettyServer(adminPort, adminTlsConfig,
grpcMessageSizeMax, flowControlWindow);
addAdminService(raftServer, builder);
final Server adminServer = builder.build();
- servers.add(adminServer);
+ servers.put(GrpcAdminProtocolService.class.getName(), adminServer);
adminServerAddressSupplier = newAddressSupplier(adminPort, adminServer);
} else {
adminServerAddressSupplier = addressSupplier;
@@ -191,7 +191,7 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
startBuildingNettyServer(clientPort, clientTlsConfig,
grpcMessageSizeMax, flowControlWindow);
addClientService(builder);
final Server clientServer = builder.build();
- servers.add(clientServer);
+ servers.put(GrpcClientProtocolService.class.getName(), clientServer);
clientServerAddressSupplier = newAddressSupplier(clientPort,
clientServer);
} else {
clientServerAddressSupplier = addressSupplier;
@@ -251,7 +251,7 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
@Override
public void startImpl() {
- for (Server server : servers) {
+ for (Server server : servers.values()) {
try {
server.start();
} catch (IOException e) {
@@ -264,10 +264,10 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
@Override
public void closeImpl() throws IOException {
- for (Server server : servers) {
- final String name = getId() + ": shutdown server with port " +
server.getPort();
+ for (Map.Entry<String, Server> server : servers.entrySet()) {
+ final String name = getId() + ": shutdown server " + server.getKey();
LOG.info("{} now", name);
- final Server s = server.shutdownNow();
+ final Server s = server.getValue().shutdownNow();
super.closeImpl();
try {
s.awaitTermination();
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 23fd98f3..0bb69286 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -86,6 +86,7 @@ public final class NettyRpcService extends
RaftServerRpcWithProxy<NettyRpcProxy,
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final MemoizedSupplier<ChannelFuture> channel;
+ private final InetSocketAddress socketAddress;
@ChannelHandler.Sharable
class InboundHandler extends
SimpleChannelInboundHandler<RaftNettyServerRequestProto> {
@@ -118,7 +119,7 @@ public final class NettyRpcService extends
RaftServerRpcWithProxy<NettyRpcProxy,
final String host = NettyConfigKeys.Server.host(server.getProperties());
final int port = NettyConfigKeys.Server.port(server.getProperties());
- InetSocketAddress socketAddress =
+ socketAddress =
host == null || host.isEmpty() ? new InetSocketAddress(port) : new
InetSocketAddress(host, port);
this.channel = JavaUtils.memoize(() -> new ServerBootstrap()
.group(bossGroup, workerGroup)
@@ -167,7 +168,14 @@ public final class NettyRpcService extends
RaftServerRpcWithProxy<NettyRpcProxy,
@Override
public InetSocketAddress getInetSocketAddress() {
- return (InetSocketAddress)getChannel().localAddress();
+ try {
+ return (InetSocketAddress) getChannel().localAddress();
+ } catch (IllegalStateException e) {
+ if (socketAddress.getPort() != NettyConfigKeys.Server.PORT_DEFAULT) {
+ return socketAddress;
+ }
+ throw e;
+ }
}
RaftNettyServerReplyProto handle(RaftNettyServerRequestProto proto) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 101c1343..16cb2582 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -658,7 +658,8 @@ class LeaderStateImpl implements LeaderState {
final Timestamp progressTime =
Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3L *
server.getMaxTimeoutMs());
if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
- LOG.debug("{} detects a follower {} timeout ({}) for bootstrapping",
this, follower, timeoutTime);
+ LOG.debug("{} detects a follower {} timeout ({}ms) for bootstrapping",
this, follower,
+ follower.getLastRpcResponseTime().elapsedTimeMs());
return BootStrapProgress.NOPROGRESS;
} else if (follower.getMatchIndex() + stagingCatchupGap > committed
&& follower.getLastRpcResponseTime().compareTo(progressTime) > 0
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0528080a..9268b99a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1388,8 +1388,8 @@ class RaftServerImpl implements RaftServer.Division,
: state.getLog().append(entries);
commitInfos.forEach(commitInfoCache::update);
+ CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
if (!isHeartbeat) {
- CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
final long installedIndex =
snapshotInstallationHandler.getInstalledIndex();
if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) {
LOG.info("{}: Follower has completed install the snapshot {}.", this,
installedIndex);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index 86d6fc61..49caeb9f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -51,6 +51,10 @@ public final class LogProtoUtils {
} else if (entry.hasMetadataEntry()) {
final MetadataProto metadata = entry.getMetadataEntry();
s = "(c:" + metadata.getCommitIndex() + ")";
+ } else if (entry.hasConfigurationEntry()) {
+ final RaftConfigurationProto config = entry.getConfigurationEntry();
+ s = "(current:" + config.getPeersList().stream().map(p ->
p.toString()).collect(Collectors.joining(",")) +
+ ", old:" + config.getOldPeersList().stream().map(p ->
p.toString()).collect(Collectors.joining(",")) + ")";
} else {
s = "";
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 652f26dc..39683f6f 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -290,6 +290,7 @@ public abstract class MiniRaftCluster implements Closeable {
public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean
format) {
final RaftServerProxy s = newRaftServer(id, group, format);
+ peers.put(s.getId(), s.getPeer());
Preconditions.assertTrue(servers.put(id, s) == null);
return s;
}
@@ -327,7 +328,6 @@ public abstract class MiniRaftCluster implements Closeable {
final RaftServer proxy = putNewServer(serverId, group, format);
proxy.start();
- peers.put(proxy.getId(), proxy.getPeer());
return group == null? null: proxy.getDivision(group.getGroupId());
}
@@ -432,11 +432,10 @@ public abstract class MiniRaftCluster implements
Closeable {
// create and add new RaftServers
final Collection<RaftServer> newServers = putNewServers(peerIds, true,
raftGroup);
- startServers(newServers);
- if (!startNewPeer) {
- // start and then close, in order to bind the port
+ if (startNewPeer) {
+ // start the server
for(RaftServer s : newServers) {
- s.close();
+ s.start();
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index abf0292b..d9b4e93a 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -159,7 +159,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
cluster.setConfiguration(allPeers);
// wait for the new configuration to take effect
- waitAndCheckNewConf(cluster, allPeers, 2, null);
+ waitAndCheckNewConf(cluster, allPeers, 2, 2, null);
});
}
@@ -290,6 +290,9 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
void runTestReconfTimeout(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+ RaftServerTestUtil.LogCapturer logCapture =
+ RaftServerTestUtil.LogCapturer.captureLogs(RaftServer.Division.LOG);
+
try (final RaftClient client = cluster.createClient(leaderId)) {
PeerChanges c1 = cluster.addNewPeers(2, false);
@@ -301,8 +304,8 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
final SetConfigurationRequest request =
cluster.newSetConfigurationRequest(
client.getId(), leaderId, c1.allPeersInNewConf);
try {
- sender.sendRequest(request);
- Assert.fail("did not get expected exception");
+ RaftClientReply reply = sender.sendRequest(request);
+ Assert.fail("did not get expected exception " + reply.toString() + "
[" + logCapture.getOutput() + "]");
} catch (IOException e) {
Assert.assertTrue("Got exception " + e,
e instanceof ReconfigurationTimeoutException);
@@ -333,13 +336,13 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
@Test
public void testBootstrapReconfWithSingleNodeAddOne() throws Exception {
// originally 1 peer, add 1 more
- runWithNewCluster(1, cluster -> runTestBootstrapReconf(1, false, cluster));
+ runWithNewCluster(1, cluster -> runTestBootstrapReconf(1, true, cluster));
}
@Test
public void testBootstrapReconfWithSingleNodeAddTwo() throws Exception {
// originally 1 peer, add 2 more
- runWithNewCluster(1, cluster -> runTestBootstrapReconf(2, false, cluster));
+ runWithNewCluster(1, cluster -> runTestBootstrapReconf(2, true, cluster));
}
@Test
@@ -375,7 +378,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
});
clientThread.start();
- if (!startNewPeer) {
+ if (startNewPeer) {
// Make sure that set configuration is run inside the thread
RaftTestUtil.waitFor(() -> clientThread.isAlive(), 300, 5000);
ONE_SECOND.sleep();
@@ -388,6 +391,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
RaftTestUtil.waitFor(() -> success.get() != null && success.get(),
300, 15000);
LOG.info(cluster.printServers());
+ RaftTestUtil.waitFor(() -> cluster.getLeader() != null, 300, 5000);
final RaftLog leaderLog = cluster.getLeader().getRaftLog();
for (RaftPeer newPeer : c1.newPeers) {
final RaftServer.Division d = cluster.getDivision(newPeer.getId());
@@ -428,6 +432,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
final RaftClientReply reply =
client.admin().setConfiguration(c2.allPeersInNewConf);
if (reply.isSuccess()) {
setConf.complete(null);
+ return;
}
LOG.info("setConf attempt #{} failed, {}", i,
cluster.printServers());
}
@@ -438,12 +443,12 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
});
clientThread.start();
- TimeUnit.SECONDS.sleep(1);
// the leader cannot generate the (old, new) conf, and it will keep
- // bootstrapping the 2 new peers since they have not started yet
+ // bootstrapping the 1 new peer since it has not started yet.
Assert.assertFalse(((RaftConfigurationImpl)cluster.getLeader().getRaftConf()).isTransitional());
- // only (0) the first conf entry, (1) the 1st setConf entry and (2) a
metadata entry
+ // (0) the first conf entry, (1) the 1st setConf entry, (2) a metadata
entry
+ // (3) new current conf entry (4) a metadata entry
{
final RaftLog leaderLog = cluster.getLeader().getRaftLog();
for(LogEntryProto e : RaftTestUtil.getLogEntryProtos(leaderLog)) {
@@ -457,6 +462,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
Assert.assertEquals(leaderId, killed);
final RaftPeerId newLeaderId =
RaftTestUtil.waitForLeader(cluster).getId();
LOG.info("newLeaderId: {}", newLeaderId);
+ TimeDuration.valueOf(500, TimeUnit.MILLISECONDS).sleep();
LOG.info("start new peers: {}", Arrays.asList(c1.newPeers));
for (RaftPeer np : c1.newPeers) {
@@ -469,8 +475,8 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
}
// the client fails with the first leader, and then retry the same
setConfiguration request
- waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 2,
Collections.singletonList(leaderId));
- setConf.get(1, TimeUnit.SECONDS);
+ waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 1,
Collections.singletonList(leaderId));
+ setConf.get(2, TimeUnit.SECONDS);
} finally {
if (clientThread != null) {
clientRunning.set(false);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index e6e60eb1..02eff400 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -17,7 +17,12 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Layout;
import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.WriterAppender;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
@@ -44,6 +49,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -71,6 +77,14 @@ public class RaftServerTestUtil {
Log4jUtils.setLogLevel(PendingRequests.LOG, level);
}
+ public static void waitAndCheckNewConf(MiniRaftCluster cluster,
+ RaftPeer[] peers, int numOfNewPeers, int numOfRemovedPeers,
Collection<RaftPeerId> deadPeers)
+ throws Exception {
+ final TimeDuration sleepTime = cluster.getTimeoutMax().apply(n -> n *
(numOfRemovedPeers + numOfNewPeers + 2));
+ JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, Arrays.asList(peers),
deadPeers),
+ 10, sleepTime, "waitAndCheckNewConf", LOG);
+ }
+
public static void waitAndCheckNewConf(MiniRaftCluster cluster,
RaftPeer[] peers, int numOfRemovedPeers, Collection<RaftPeerId>
deadPeers)
throws Exception {
@@ -186,4 +200,53 @@ public class RaftServerTestUtil {
throws IOException {
return ((RaftServerImpl)leader).takeSnapshotAsync(r);
}
+
+ /**
+ * Class to capture logs for doing assertions.
+ */
+ public static final class LogCapturer {
+ private StringWriter sw = new StringWriter();
+ private WriterAppender appender;
+ private org.apache.log4j.Logger logger;
+
+ public static LogCapturer captureLogs(org.slf4j.Logger logger) {
+ return new LogCapturer(toLog4j(logger), getDefaultLayout());
+ }
+
+ public static LogCapturer captureLogs(org.slf4j.Logger logger, Layout
layout) {
+ return new LogCapturer(toLog4j(logger), layout);
+ }
+
+ private static Layout getDefaultLayout() {
+ Appender defaultAppender =
org.apache.log4j.Logger.getRootLogger().getAppender("stdout");
+ if (defaultAppender == null) {
+ defaultAppender =
org.apache.log4j.Logger.getRootLogger().getAppender("console");
+ }
+ return (defaultAppender == null) ? new PatternLayout() :
+ defaultAppender.getLayout();
+ }
+
+ private LogCapturer(org.apache.log4j.Logger logger, Layout layout) {
+ this.logger = logger;
+ this.appender = new WriterAppender(layout, sw);
+ logger.addAppender(this.appender);
+ }
+
+ public String getOutput() {
+ return sw.toString();
+ }
+
+ public void stopCapturing() {
+ logger.removeAppender(appender);
+ }
+
+ public void clearOutput() {
+ sw.getBuffer().setLength(0);
+ }
+ }
+
+ @Deprecated
+ public static org.apache.log4j.Logger toLog4j(org.slf4j.Logger logger) {
+ return LogManager.getLogger(logger.getName());
+ }
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 0e5c82e2..93e8f852 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -161,7 +161,9 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
static void assertTruncatedLog(RaftPeerId id, File openLogFile, long
lastIndex, MiniRaftCluster cluster) throws Exception {
// truncate log
- FileUtils.truncateFile(openLogFile, openLogFile.length() - 1);
+ if (openLogFile.length() > 0) {
+ FileUtils.truncateFile(openLogFile, openLogFile.length() - 1);
+ }
final RaftServer.Division server = cluster.restartServer(id, false);
// the last index should be one less than before
Assert.assertEquals(lastIndex - 1,
server.getRaftLog().getLastEntryTermIndex().getIndex());