This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 5e7a7731255baeccc20a72bce8c6e14f33706de2 Author: Sammi Chen <[email protected]> AuthorDate: Fri Jul 15 10:19:58 2022 +0800 RATIS-1605. Do not start RaftServer in MiniRaftCluster#addNewPeers. (#663) (cherry picked from commit f0146f8f0ab192c96079dfbb2fdbee7e7488c361) --- .../grpc/client/GrpcClientProtocolClient.java | 2 +- .../org/apache/ratis/grpc/server/GrpcService.java | 20 +++---- .../apache/ratis/netty/server/NettyRpcService.java | 12 ++++- .../apache/ratis/server/impl/LeaderStateImpl.java | 3 +- .../apache/ratis/server/impl/RaftServerImpl.java | 2 +- .../apache/ratis/server/raftlog/LogProtoUtils.java | 4 ++ .../apache/ratis/server/impl/MiniRaftCluster.java | 9 ++-- .../server/impl/RaftReconfigurationBaseTest.java | 28 ++++++---- .../ratis/server/impl/RaftServerTestUtil.java | 63 ++++++++++++++++++++++ .../apache/ratis/server/ServerRestartTests.java | 4 +- 10 files changed, 115 insertions(+), 32 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index 5ca9a408..c2ee1f24 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -382,7 +382,7 @@ public class GrpcClientProtocolClient implements Closeable { private void timeoutCheck(long callId, TimeDuration timeOutDuration) { handleReplyFuture(callId, f -> f.completeExceptionally( - new TimeoutIOException("Request #" + callId + " timeout " + timeOutDuration))); + new TimeoutIOException(getName() + " request #" + callId + " timeout " + timeOutDuration))); } private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java index 3ce4fc51..877ed85e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java @@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -103,7 +103,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol return new Builder(); } - private final List<Server> servers = new ArrayList<>(3); + private final Map<String, Server> servers = new HashMap<>(); private final Supplier<InetSocketAddress> addressSupplier; private final Supplier<InetSocketAddress> clientServerAddressSupplier; private final Supplier<InetSocketAddress> adminServerAddressSupplier; @@ -172,7 +172,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol } final Server server = serverBuilder.build(); - servers.add(server); + servers.put(GrpcServerProtocolService.class.getSimpleName(), server); addressSupplier = newAddressSupplier(serverPort, server); if (separateAdminServer) { @@ -180,7 +180,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol startBuildingNettyServer(adminPort, adminTlsConfig, grpcMessageSizeMax, flowControlWindow); addAdminService(raftServer, builder); final Server adminServer = builder.build(); - servers.add(adminServer); + servers.put(GrpcAdminProtocolService.class.getName(), adminServer); adminServerAddressSupplier = newAddressSupplier(adminPort, adminServer); } else { adminServerAddressSupplier = addressSupplier; @@ -191,7 +191,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol startBuildingNettyServer(clientPort, clientTlsConfig, grpcMessageSizeMax, flowControlWindow); addClientService(builder); final Server clientServer = builder.build(); - servers.add(clientServer); + servers.put(GrpcClientProtocolService.class.getName(), clientServer); clientServerAddressSupplier = newAddressSupplier(clientPort, clientServer); } else { clientServerAddressSupplier = addressSupplier; @@ -251,7 +251,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol @Override public void startImpl() { - for (Server server : servers) { + for (Server server : servers.values()) { try { server.start(); } catch (IOException e) { @@ -264,10 +264,10 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol @Override public void closeImpl() throws IOException { - for (Server server : servers) { - final String name = getId() + ": shutdown server with port " + server.getPort(); + for (Map.Entry<String, Server> server : servers.entrySet()) { + final String name = getId() + ": shutdown server " + server.getKey(); LOG.info("{} now", name); - final Server s = server.shutdownNow(); + final Server s = server.getValue().shutdownNow(); super.closeImpl(); try { s.awaitTermination(); diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 23fd98f3..0bb69286 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -86,6 +86,7 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final MemoizedSupplier<ChannelFuture> channel; + private final InetSocketAddress socketAddress; @ChannelHandler.Sharable class InboundHandler extends SimpleChannelInboundHandler<RaftNettyServerRequestProto> { @@ -118,7 +119,7 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, final String host = NettyConfigKeys.Server.host(server.getProperties()); final int port = NettyConfigKeys.Server.port(server.getProperties()); - InetSocketAddress socketAddress = + socketAddress = host == null || host.isEmpty() ? new InetSocketAddress(port) : new InetSocketAddress(host, port); this.channel = JavaUtils.memoize(() -> new ServerBootstrap() .group(bossGroup, workerGroup) @@ -167,7 +168,14 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, @Override public InetSocketAddress getInetSocketAddress() { - return (InetSocketAddress)getChannel().localAddress(); + try { + return (InetSocketAddress) getChannel().localAddress(); + } catch (IllegalStateException e) { + if (socketAddress.getPort() != NettyConfigKeys.Server.PORT_DEFAULT) { + return socketAddress; + } + throw e; + } } RaftNettyServerReplyProto handle(RaftNettyServerRequestProto proto) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 101c1343..16cb2582 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -658,7 +658,8 @@ class LeaderStateImpl implements LeaderState { final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs()); final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3L * server.getMaxTimeoutMs()); if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) { - LOG.debug("{} detects a follower {} timeout ({}) for bootstrapping", this, follower, timeoutTime); + LOG.debug("{} detects a follower {} timeout ({}ms) for bootstrapping", this, follower, + follower.getLastRpcResponseTime().elapsedTimeMs()); return BootStrapProgress.NOPROGRESS; } else if (follower.getMatchIndex() + stagingCatchupGap > committed && follower.getLastRpcResponseTime().compareTo(progressTime) > 0 diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 0528080a..9268b99a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1388,8 +1388,8 @@ class RaftServerImpl implements RaftServer.Division, : state.getLog().append(entries); commitInfos.forEach(commitInfoCache::update); + CodeInjectionForTesting.execute(LOG_SYNC, getId(), null); if (!isHeartbeat) { - CodeInjectionForTesting.execute(LOG_SYNC, getId(), null); final long installedIndex = snapshotInstallationHandler.getInstalledIndex(); if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) { LOG.info("{}: Follower has completed install the snapshot {}.", this, installedIndex); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java index 86d6fc61..49caeb9f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java @@ -51,6 +51,10 @@ public final class LogProtoUtils { } else if (entry.hasMetadataEntry()) { final MetadataProto metadata = entry.getMetadataEntry(); s = "(c:" + metadata.getCommitIndex() + ")"; + } else if (entry.hasConfigurationEntry()) { + final RaftConfigurationProto config = entry.getConfigurationEntry(); + s = "(current:" + config.getPeersList().stream().map(p -> p.toString()).collect(Collectors.joining(",")) + + ", old:" + config.getOldPeersList().stream().map(p -> p.toString()).collect(Collectors.joining(",")) + ")"; } else { s = ""; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 652f26dc..39683f6f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -290,6 +290,7 @@ public abstract class MiniRaftCluster implements Closeable { public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) { final RaftServerProxy s = newRaftServer(id, group, format); + peers.put(s.getId(), s.getPeer()); Preconditions.assertTrue(servers.put(id, s) == null); return s; } @@ -327,7 +328,6 @@ public abstract class MiniRaftCluster implements Closeable { final RaftServer proxy = putNewServer(serverId, group, format); proxy.start(); - peers.put(proxy.getId(), proxy.getPeer()); return group == null? null: proxy.getDivision(group.getGroupId()); } @@ -432,11 +432,10 @@ public abstract class MiniRaftCluster implements Closeable { // create and add new RaftServers final Collection<RaftServer> newServers = putNewServers(peerIds, true, raftGroup); - startServers(newServers); - if (!startNewPeer) { - // start and then close, in order to bind the port + if (startNewPeer) { + // start the server for(RaftServer s : newServers) { - s.close(); + s.start(); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index abf0292b..d9b4e93a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -159,7 +159,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste cluster.setConfiguration(allPeers); // wait for the new configuration to take effect - waitAndCheckNewConf(cluster, allPeers, 2, null); + waitAndCheckNewConf(cluster, allPeers, 2, 2, null); }); } @@ -290,6 +290,9 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste void runTestReconfTimeout(CLUSTER cluster) throws Exception { final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); + RaftServerTestUtil.LogCapturer logCapture = + RaftServerTestUtil.LogCapturer.captureLogs(RaftServer.Division.LOG); + try (final RaftClient client = cluster.createClient(leaderId)) { PeerChanges c1 = cluster.addNewPeers(2, false); @@ -301,8 +304,8 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste final SetConfigurationRequest request = cluster.newSetConfigurationRequest( client.getId(), leaderId, c1.allPeersInNewConf); try { - sender.sendRequest(request); - Assert.fail("did not get expected exception"); + RaftClientReply reply = sender.sendRequest(request); + Assert.fail("did not get expected exception " + reply.toString() + " [" + logCapture.getOutput() + "]"); } catch (IOException e) { Assert.assertTrue("Got exception " + e, e instanceof ReconfigurationTimeoutException); @@ -333,13 +336,13 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste @Test public void testBootstrapReconfWithSingleNodeAddOne() throws Exception { // originally 1 peer, add 1 more - runWithNewCluster(1, cluster -> runTestBootstrapReconf(1, false, cluster)); + runWithNewCluster(1, cluster -> runTestBootstrapReconf(1, true, cluster)); } @Test public void testBootstrapReconfWithSingleNodeAddTwo() throws Exception { // originally 1 peer, add 2 more - runWithNewCluster(1, cluster -> runTestBootstrapReconf(2, false, cluster)); + runWithNewCluster(1, cluster -> runTestBootstrapReconf(2, true, cluster)); } @Test @@ -375,7 +378,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste }); clientThread.start(); - if (!startNewPeer) { + if (startNewPeer) { // Make sure that set configuration is run inside the thread RaftTestUtil.waitFor(() -> clientThread.isAlive(), 300, 5000); ONE_SECOND.sleep(); @@ -388,6 +391,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste RaftTestUtil.waitFor(() -> success.get() != null && success.get(), 300, 15000); LOG.info(cluster.printServers()); + RaftTestUtil.waitFor(() -> cluster.getLeader() != null, 300, 5000); final RaftLog leaderLog = cluster.getLeader().getRaftLog(); for (RaftPeer newPeer : c1.newPeers) { final RaftServer.Division d = cluster.getDivision(newPeer.getId()); @@ -428,6 +432,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste final RaftClientReply reply = client.admin().setConfiguration(c2.allPeersInNewConf); if (reply.isSuccess()) { setConf.complete(null); + return; } LOG.info("setConf attempt #{} failed, {}", i, cluster.printServers()); } @@ -438,12 +443,12 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste }); clientThread.start(); - TimeUnit.SECONDS.sleep(1); // the leader cannot generate the (old, new) conf, and it will keep - // bootstrapping the 2 new peers since they have not started yet + // bootstrapping the 1 new peer since it has not started yet. Assert.assertFalse(((RaftConfigurationImpl)cluster.getLeader().getRaftConf()).isTransitional()); - // only (0) the first conf entry, (1) the 1st setConf entry and (2) a metadata entry + // (0) the first conf entry, (1) the 1st setConf entry, (2) a metadata entry + // (3) new current conf entry (4) a metadata entry { final RaftLog leaderLog = cluster.getLeader().getRaftLog(); for(LogEntryProto e : RaftTestUtil.getLogEntryProtos(leaderLog)) { @@ -457,6 +462,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste Assert.assertEquals(leaderId, killed); final RaftPeerId newLeaderId = RaftTestUtil.waitForLeader(cluster).getId(); LOG.info("newLeaderId: {}", newLeaderId); + TimeDuration.valueOf(500, TimeUnit.MILLISECONDS).sleep(); LOG.info("start new peers: {}", Arrays.asList(c1.newPeers)); for (RaftPeer np : c1.newPeers) { @@ -469,8 +475,8 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste } // the client fails with the first leader, and then retry the same setConfiguration request - waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 2, Collections.singletonList(leaderId)); - setConf.get(1, TimeUnit.SECONDS); + waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 1, Collections.singletonList(leaderId)); + setConf.get(2, TimeUnit.SECONDS); } finally { if (clientThread != null) { clientRunning.set(false); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index e6e60eb1..02eff400 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -17,7 +17,12 @@ */ package org.apache.ratis.server.impl; +import org.apache.log4j.Appender; +import org.apache.log4j.Layout; import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.PatternLayout; +import org.apache.log4j.WriterAppender; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientReply; @@ -44,6 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.StringWriter; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -71,6 +77,14 @@ public class RaftServerTestUtil { Log4jUtils.setLogLevel(PendingRequests.LOG, level); } + public static void waitAndCheckNewConf(MiniRaftCluster cluster, + RaftPeer[] peers, int numOfNewPeers, int numOfRemovedPeers, Collection<RaftPeerId> deadPeers) + throws Exception { + final TimeDuration sleepTime = cluster.getTimeoutMax().apply(n -> n * (numOfRemovedPeers + numOfNewPeers + 2)); + JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, Arrays.asList(peers), deadPeers), + 10, sleepTime, "waitAndCheckNewConf", LOG); + } + public static void waitAndCheckNewConf(MiniRaftCluster cluster, RaftPeer[] peers, int numOfRemovedPeers, Collection<RaftPeerId> deadPeers) throws Exception { @@ -186,4 +200,53 @@ public class RaftServerTestUtil { throws IOException { return ((RaftServerImpl)leader).takeSnapshotAsync(r); } + + /** + * Class to capture logs for doing assertions. + */ + public static final class LogCapturer { + private StringWriter sw = new StringWriter(); + private WriterAppender appender; + private org.apache.log4j.Logger logger; + + public static LogCapturer captureLogs(org.slf4j.Logger logger) { + return new LogCapturer(toLog4j(logger), getDefaultLayout()); + } + + public static LogCapturer captureLogs(org.slf4j.Logger logger, Layout layout) { + return new LogCapturer(toLog4j(logger), layout); + } + + private static Layout getDefaultLayout() { + Appender defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("stdout"); + if (defaultAppender == null) { + defaultAppender = org.apache.log4j.Logger.getRootLogger().getAppender("console"); + } + return (defaultAppender == null) ? new PatternLayout() : + defaultAppender.getLayout(); + } + + private LogCapturer(org.apache.log4j.Logger logger, Layout layout) { + this.logger = logger; + this.appender = new WriterAppender(layout, sw); + logger.addAppender(this.appender); + } + + public String getOutput() { + return sw.toString(); + } + + public void stopCapturing() { + logger.removeAppender(appender); + } + + public void clearOutput() { + sw.getBuffer().setLength(0); + } + } + + @Deprecated + public static org.apache.log4j.Logger toLog4j(org.slf4j.Logger logger) { + return LogManager.getLogger(logger.getName()); + } } diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java index 0e5c82e2..93e8f852 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java @@ -161,7 +161,9 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> static void assertTruncatedLog(RaftPeerId id, File openLogFile, long lastIndex, MiniRaftCluster cluster) throws Exception { // truncate log - FileUtils.truncateFile(openLogFile, openLogFile.length() - 1); + if (openLogFile.length() > 0) { + FileUtils.truncateFile(openLogFile, openLogFile.length() - 1); + } final RaftServer.Division server = cluster.restartServer(id, false); // the last index should be one less than before Assert.assertEquals(lastIndex - 1, server.getRaftLog().getLastEntryTermIndex().getIndex());
