Repository: incubator-ratis Updated Branches: refs/heads/master 50588bde3 -> eca35312c
RATIS-315. Add an option to delete the group directory in groupRemove. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/eca35312 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/eca35312 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/eca35312 Branch: refs/heads/master Commit: eca35312cef825c209c14b3dff3bc2b94311ef14 Parents: 50588bd Author: Mukul Kumar Singh <[email protected]> Authored: Mon Sep 17 23:09:27 2018 +0530 Committer: Mukul Kumar Singh <[email protected]> Committed: Mon Sep 17 23:09:27 2018 +0530 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 2 +- .../ratis/client/impl/ClientProtoUtils.java | 9 ++- .../ratis/client/impl/RaftClientImpl.java | 4 +- .../ratis/protocol/GroupManagementRequest.java | 15 +++-- .../java/org/apache/ratis/util/StringUtils.java | 13 ++++ .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 30 ++++----- .../ratis/hadooprpc/TestRaftWithHadoopRpc.java | 40 ++++++------ .../ratis/netty/server/NettyRpcService.java | 8 +-- .../apache/ratis/netty/TestRaftWithNetty.java | 17 +---- ratis-proto-shaded/src/main/proto/Raft.proto | 1 + .../org/apache/ratis/server/RaftServer.java | 2 +- .../ratis/server/impl/ConfigurationManager.java | 9 ++- .../apache/ratis/server/impl/LeaderState.java | 14 ++--- .../ratis/server/impl/RaftConfiguration.java | 2 +- .../ratis/server/impl/RaftServerImpl.java | 18 ++++-- .../ratis/server/impl/RaftServerProxy.java | 49 +++++++++++---- .../ratis/server/impl/ServerImplUtils.java | 4 +- .../ratis/server/impl/ServerProtoUtils.java | 8 ++- .../apache/ratis/server/impl/ServerState.java | 23 +++---- .../server/storage/RaftStorageDirectory.java | 5 +- .../java/org/apache/ratis/MiniRaftCluster.java | 39 +++++++----- .../java/org/apache/ratis/RaftBasicTests.java | 66 +++++++++----------- .../org/apache/ratis/RaftExceptionBaseTest.java | 2 +- .../java/org/apache/ratis/RaftTestUtil.java | 4 +- .../server/impl/GroupManagementBaseTest.java | 32 +++++++--- .../impl/RaftReconfigurationBaseTest.java | 17 ++--- .../simulation/TestRaftWithSimulatedRpc.java | 27 +------- 27 files changed, 254 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 20d746b..ea8b1a2 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -95,7 +95,7 @@ public interface RaftClient extends Closeable { RaftClientReply groupAdd(RaftGroup newGroup, RaftPeerId server) throws IOException; /** Send groupRemove request to the given server (not the raft service). */ - RaftClientReply groupRemove(RaftGroupId groupId, RaftPeerId server) throws IOException; + RaftClientReply groupRemove(RaftGroupId groupId, boolean deleteDirectory, RaftPeerId server) throws IOException; /** Send serverInformation request to the given server.*/ RaftClientReply serverInformation(RaftPeerId server) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 7065dd4..b5a6172 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -302,8 +302,9 @@ public interface ClientProtoUtils { return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(), ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup())); case GROUPREMOVE: + final GroupRemoveRequestProto remove = p.getGroupRemove(); return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(), - ProtoUtils.toRaftGroupId(p.getGroupRemove().getGroupId())); + ProtoUtils.toRaftGroupId(remove.getGroupId()), remove.getDeleteDirectory()); default: throw new IllegalArgumentException("Unexpected op " + p.getOpCase() + " in " + p); } @@ -329,8 +330,10 @@ public interface ClientProtoUtils { } final GroupManagementRequest.Remove remove = request.getRemove(); if (remove != null) { - b.setGroupRemove(GroupRemoveRequestProto.newBuilder().setGroupId( - ProtoUtils.toRaftGroupIdProtoBuilder(remove.getGroupId())).build()); + b.setGroupRemove(GroupRemoveRequestProto.newBuilder() + .setGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(remove.getGroupId())) + .setDeleteDirectory(remove.isDeleteDirectory()) + .build()); } return b.build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index d6c9a27..a07e229 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -221,12 +221,12 @@ final class RaftClientImpl implements RaftClient { } @Override - public RaftClientReply groupRemove(RaftGroupId groupId, RaftPeerId server) throws IOException { + public RaftClientReply groupRemove(RaftGroupId groupId, boolean deleteDirectory, RaftPeerId server) throws IOException { Objects.requireNonNull(groupId, "groupId == null"); Objects.requireNonNull(server, "server == null"); final long callId = nextCallId(); - return sendRequest(GroupManagementRequest.newRemove(clientId, server, callId, groupId)); + return sendRequest(GroupManagementRequest.newRemove(clientId, server, callId, groupId, deleteDirectory)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java index 8577548..cbaae3b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java @@ -46,9 +46,11 @@ public class GroupManagementRequest extends RaftClientRequest { public static class Remove extends Op { private final RaftGroupId groupId; + private final boolean deleteDirectory; - public Remove(RaftGroupId groupId) { + public Remove(RaftGroupId groupId, boolean deleteDirectory) { this.groupId = groupId; + this.deleteDirectory = deleteDirectory; } @Override @@ -56,9 +58,13 @@ public class GroupManagementRequest extends RaftClientRequest { return groupId; } + public boolean isDeleteDirectory() { + return deleteDirectory; + } + @Override public String toString() { - return getClass().getSimpleName() + ":" + getGroupId(); + return getClass().getSimpleName() + ":" + getGroupId() + ", " + (deleteDirectory? "delete": "retain") + "-dir"; } } @@ -66,8 +72,9 @@ public class GroupManagementRequest extends RaftClientRequest { return new GroupManagementRequest(clientId, serverId, callId, new Add(group)); } - public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId, RaftGroupId groupId) { - return new GroupManagementRequest(clientId, serverId, callId, new Remove(groupId)); + public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId, + RaftGroupId groupId, boolean deleteDirectory) { + return new GroupManagementRequest(clientId, serverId, callId, new Remove(groupId, deleteDirectory)); } private final Op op; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java index 710a0a2..70039b2 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java @@ -25,6 +25,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.nio.ByteBuffer; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.function.Supplier; @@ -128,4 +129,16 @@ public class StringUtils { } }; } + + public static <K, V> String map2String(Map<K, V> map) { + if (map == null) { + return null; + } else if (map.isEmpty()) { + return "<EMPTY_MAP>"; + } else { + final StringBuilder b = new StringBuilder("{"); + map.entrySet().stream().forEach(e -> b.append("\n ").append(e.getKey()).append(" -> ").append(e.getValue())); + return b.append("\n}").toString(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index 30fcd91..16c0f31 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -30,27 +30,18 @@ import org.apache.ratis.statemachine.StateMachine; import org.junit.Assert; import org.junit.Test; -import java.io.IOException; import java.util.Arrays; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import static org.apache.ratis.RaftTestUtil.waitForLeader; -public class TestRaftWithGrpc extends RaftBasicTests { - private final MiniRaftClusterWithGRpc cluster; +public class TestRaftWithGrpc + extends RaftBasicTests<MiniRaftClusterWithGRpc> + implements MiniRaftClusterWithGRpc.FactoryGet { - public TestRaftWithGrpc() throws IOException { - properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + { + getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); - cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster( - NUM_SERVERS, properties); - Assert.assertNull(cluster.getLeader()); - } - - @Override - public MiniRaftClusterWithGRpc getCluster() { - return cluster; } @Override @@ -62,14 +53,17 @@ public class TestRaftWithGrpc extends RaftBasicTests { @Test public void testRequestTimeout() throws Exception { - testRequestTimeout(false, getCluster(), LOG); + try(MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS)) { + cluster.start(); + testRequestTimeout(false, cluster, LOG); + } } @Test - public void testUpdateViaHeartbeat() - throws IOException, InterruptedException, ExecutionException { + public void testUpdateViaHeartbeat() throws Exception { LOG.info("Running testUpdateViaHeartbeat"); - final MiniRaftClusterWithGRpc cluster = getCluster(); + final MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS); + cluster.start(); waitForLeader(cluster); long waitTime = 5000; try (final RaftClient client = cluster.createClient()) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java index 3f51c0b..659d37c 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java @@ -20,34 +20,33 @@ package org.apache.ratis.hadooprpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftBasicTests; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.util.LogUtils; import org.junit.Test; -import java.io.IOException; - -public class TestRaftWithHadoopRpc extends RaftBasicTests { +public class TestRaftWithHadoopRpc + extends RaftBasicTests<MiniRaftClusterWithHadoopRpc> { static { LogUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG); } - private final MiniRaftClusterWithHadoopRpc cluster; - - public TestRaftWithHadoopRpc() throws IOException { - final Configuration conf = new Configuration(); - HadoopConfigKeys.Ipc.setHandlers(conf, 20); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); - conf.setInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, 1000); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); - cluster = MiniRaftClusterWithHadoopRpc.FACTORY.newCluster( - NUM_SERVERS, getProperties(), conf); - } - - @Override - public MiniRaftClusterWithHadoopRpc getCluster() { - return cluster; + static final Configuration CONF = new Configuration(); + static { + HadoopConfigKeys.Ipc.setHandlers(CONF, 20); + CONF.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + CONF.setInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, 1000); + CONF.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); } + static final MiniRaftCluster.Factory<MiniRaftClusterWithHadoopRpc> FACTORY + = new MiniRaftClusterWithHadoopRpc.Factory() { + @Override + public MiniRaftClusterWithHadoopRpc newCluster(String[] ids, RaftProperties prop) { + return newCluster(ids, prop, CONF); + } + }; @Override @Test @@ -55,4 +54,9 @@ public class TestRaftWithHadoopRpc extends RaftBasicTests { super.testWithLoad(); BlockRequestHandlingInjection.getInstance().unblockAll(); } + + @Override + public MiniRaftCluster.Factory<MiniRaftClusterWithHadoopRpc> getFactory() { + return FACTORY; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 7dba943..f6fcbc6 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -274,12 +274,12 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, RaftRpcRequestProto request, RaftNettyServerRequestProto proto) throws IOException { final RaftPeerId id = RaftPeerId.valueOf(request.getReplyId()); - final NettyRpcProxy p = getProxies().getProxy(id); try { + final NettyRpcProxy p = getProxies().getProxy(id); return p.send(request, proto); - } catch (ClosedChannelException cce) { - getProxies().resetProxy(id); - throw cce; + } catch (Exception e) { + getProxies().handleException(id, e, false); + throw e; } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java index b3996ac..28815d7 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java @@ -21,20 +21,9 @@ import org.apache.ratis.RaftBasicTests; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.junit.Test; -import java.io.IOException; - -public class TestRaftWithNetty extends RaftBasicTests { - private final MiniRaftClusterWithNetty cluster; - - public TestRaftWithNetty() throws IOException { - cluster = MiniRaftClusterWithNetty.FACTORY.newCluster( - NUM_SERVERS, getProperties()); - } - - @Override - public MiniRaftClusterWithNetty getCluster() { - return cluster; - } +public class TestRaftWithNetty + extends RaftBasicTests<MiniRaftClusterWithNetty> + implements MiniRaftClusterWithNetty.FactoryGet { @Override @Test http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 3d7eab3..0a93e95 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -251,6 +251,7 @@ message GroupAddRequestProto { message GroupRemoveRequestProto { RaftGroupIdProto groupId = 1; // the group to be removed. + bool deleteDirectory = 2; // delete the directory for that group? } message GroupManagementRequestProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index e071d4b..ac3111e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -64,7 +64,7 @@ public interface RaftServer extends Closeable, RpcType.Get, class Builder { private RaftPeerId serverId; private StateMachine stateMachine; - private RaftGroup group = RaftGroup.emptyGroup(); + private RaftGroup group = null; private RaftProperties properties; private Parameters parameters; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java index 6aed1d7..f8e5e57 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.StringUtils; import java.util.*; @@ -42,8 +43,7 @@ public class ConfigurationManager { this.currentConf = initialConf; } - public synchronized void addConfiguration(long logIndex, - RaftConfiguration conf) { + synchronized void addConfiguration(long logIndex, RaftConfiguration conf) { Preconditions.assertTrue(configurations.isEmpty() || configurations.lastEntry().getKey() < logIndex); configurations.put(logIndex, conf); @@ -76,5 +76,10 @@ public class ConfigurationManager { return 1 + configurations.size(); } + @Override + public synchronized String toString() { + return getClass().getSimpleName() + ", init=" + initialConf + ", confs=" + StringUtils.map2String(configurations); + } + // TODO: remove Configuration entries after they are committed } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index a13284f..839aa69 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -24,7 +24,6 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.*; @@ -152,21 +151,22 @@ public class LeaderState { voterLists = divideFollowers(conf); } - void start() { - // In the beginning of the new term, replicate an empty entry in order + LogEntryProto start() { + // In the beginning of the new term, replicate a conf entry in order // to finally commit entries in the previous term. - // Also this message can help identify the last committed index when - // the leader peer is just started. + // Also this message can help identify the last committed index and the conf. final LogEntryProto placeHolder = LogEntryProto.newBuilder() .setTerm(server.getState().getCurrentTerm()) .setIndex(raftLog.getNextIndex()) - .setNoOp(LeaderNoOp.newBuilder()).build(); + .setConfigurationEntry(ServerProtoUtils.toRaftConfigurationProto(server.getRaftConf())) + .build(); CodeInjectionForTesting.execute(APPEND_PLACEHOLDER, server.getId().toString(), null); raftLog.append(placeHolder); processor.start(); senders.forEach(LogAppender::startAppender); + return placeHolder; } boolean isReady() { @@ -507,7 +507,7 @@ public class LeaderState { } // the pending request handler will send NotLeaderException for // pending client requests when it stops - server.shutdown(); + server.shutdown(false); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java index 0034f4e..eac0f58 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java @@ -215,7 +215,7 @@ public class RaftConfiguration { @Override public String toString() { - return conf + ", old=" + oldConf; + return logEntryIndex + ": " + conf + ", old=" + oldConf; } boolean hasNoChange(RaftPeer[] newMembers) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index e33177a..1428c90 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -27,6 +27,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.server.storage.RaftStorageDirectory; import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.SnapshotInfo; @@ -46,7 +47,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.ratis.server.impl.ServerProtoUtils.toRaftConfiguration; import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; @@ -237,7 +237,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return new RaftGroup(groupId, getRaftConf().getPeers()); } - void shutdown() { + void shutdown(boolean deleteDirectory) { lifeCycle.checkStateAndClose(() -> { LOG.info("{}: shutdown {}", getId(), groupId); try { @@ -265,6 +265,14 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } catch (Exception ignored) { LOG.warn("Failed to close state for " + getId(), ignored); } + if (deleteDirectory) { + final RaftStorageDirectory dir = state.getStorage().getStorageDir(); + try { + FileUtils.deleteFully(dir.getRoot()); + } catch(Exception ignored) { + LOG.warn(getId() + ": Failed to remove RaftStorageDirectory " + dir, ignored); + } + } }); } @@ -342,7 +350,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou // start sending AppendEntries RPC to followers leaderState = new LeaderState(this, getProxy().getProperties()); - leaderState.start(); + final LogEntryProto e = leaderState.start(); + getState().setRaftConf(e.getIndex(), ServerProtoUtils.toRaftConfiguration(e)); } private void startHeartbeatMonitor() { @@ -1124,8 +1133,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { // the reply should have already been set. only need to record // the new conf in the state machine. - stateMachine.setRaftConfiguration(toRaftConfiguration(next.getIndex(), - next.getConfigurationEntry())); + stateMachine.setRaftConfiguration(ServerProtoUtils.toRaftConfiguration(next)); } else if (next.getLogEntryBodyCase() == SMLOGENTRY) { // check whether there is a TransactionContext because we are the leader. TransactionContext trx = getTransactionContext(next.getIndex()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index aea2767..0afd596 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.StateMachine; @@ -36,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -77,17 +79,13 @@ public class RaftServerProxy implements RaftServer { final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group); final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl); Preconditions.assertNull(previous, "previous"); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl)); - } + LOG.info("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl)); return newImpl; } synchronized CompletableFuture<RaftServerImpl> remove(RaftGroupId groupId) { final CompletableFuture<RaftServerImpl> future = map.remove(groupId); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: remove {}", getId(), toString(groupId, future)); - } + LOG.info("{}: remove {}", getId(), toString(groupId, future)); return future; } @@ -98,7 +96,8 @@ public class RaftServerProxy implements RaftServer { return; } isClosed = true; - map.values().parallelStream().map(CompletableFuture::join).forEach(RaftServerImpl::shutdown); + map.values().parallelStream().map(CompletableFuture::join) + .forEach(impl -> impl.shutdown(false)); } synchronized List<CompletableFuture<RaftServerImpl>> getAll() { @@ -162,6 +161,29 @@ public class RaftServerProxy implements RaftServer { this.lifeCycle = new LifeCycle(this.id); } + /** Check the storage dir and add groups*/ + void initGroups(RaftGroup group) { + final File dir = RaftServerConfigKeys.storageDir(properties); + if (dir.isDirectory()) { + for(File sub : dir.listFiles()) { + if (sub.isDirectory()) { + LOG.info("{}: found a subdirectory {}", getId(), sub); + try { + final RaftGroupId groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName())); + if (group == null || !groupId.equals(group.getGroupId())) { + addGroup(new RaftGroup(groupId)); + } + } catch(Throwable t) { + LOG.warn(getId() + ": Failed to initialize the group directory " + sub.getAbsolutePath() + ". Ignoring it", t); + } + } + } + } + if (group != null) { + addGroup(group); + } + } + private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) { return CompletableFuture.supplyAsync(() -> { try { @@ -217,7 +239,7 @@ public class RaftServerProxy implements RaftServer { return impls.containsGroup(groupId); } - CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) { + public CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) { return impls.addNew(group); } @@ -309,17 +331,17 @@ public class RaftServerProxy implements RaftServer { } final GroupManagementRequest.Add add = request.getAdd(); if (add != null) { - return groupdAddAsync(request, add.getGroup()); + return groupAddAsync(request, add.getGroup()); } final GroupManagementRequest.Remove remove = request.getRemove(); if (remove != null) { - return groupRemoveAsync(request, remove.getGroupId()); + return groupRemoveAsync(request, remove.getGroupId(), remove.isDeleteDirectory()); } return JavaUtils.completeExceptionally(new UnsupportedOperationException( getId() + ": Request not supported " + request)); } - private CompletableFuture<RaftClientReply> groupdAddAsync(GroupManagementRequest request, RaftGroup newGroup) { + private CompletableFuture<RaftClientReply> groupAddAsync(GroupManagementRequest request, RaftGroup newGroup) { if (!request.getRaftGroupId().equals(newGroup.getGroupId())) { return JavaUtils.completeExceptionally(new GroupMismatchException( getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup)); @@ -339,7 +361,8 @@ public class RaftServerProxy implements RaftServer { }); } - private CompletableFuture<RaftClientReply> groupRemoveAsync(RaftClientRequest request, RaftGroupId groupId) { + private CompletableFuture<RaftClientReply> groupRemoveAsync( + RaftClientRequest request, RaftGroupId groupId, boolean deleteDirectory) { if (!request.getRaftGroupId().equals(groupId)) { return JavaUtils.completeExceptionally(new GroupMismatchException( getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the given group id " + groupId)); @@ -351,7 +374,7 @@ public class RaftServerProxy implements RaftServer { } return f.thenApply(impl -> { final Collection<CommitInfoProto> commitInfos = impl.getCommitInfos(); - impl.shutdown(); + impl.shutdown(deleteDirectory); return new RaftClientReply(request, commitInfos); }); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 7b80dcf..2befb38 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -36,9 +36,7 @@ public class ServerImplUtils { RaftProperties properties, Parameters parameters) throws IOException { RaftServerProxy.LOG.debug("newRaftServer: {}, {}", id, group); final RaftServerProxy proxy = newRaftServer(id, gid -> stateMachine, properties, parameters); - if (group != null) { - proxy.addGroup(group); - } + proxy.initGroups(group); return proxy; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 6855473..3774737 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -31,6 +31,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.*; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; /** Server proto utilities for internal use. */ @@ -91,11 +92,12 @@ public class ServerProtoUtils { .build(); } - public static RaftConfiguration toRaftConfiguration( - long index, RaftConfigurationProto proto) { + public static RaftConfiguration toRaftConfiguration(LogEntryProto entry) { + Preconditions.assertTrue(ProtoUtils.isConfigurationLogEntry(entry)); + final RaftConfigurationProto proto = entry.getConfigurationEntry(); final RaftConfiguration.Builder b = RaftConfiguration.newBuilder() .setConf(ProtoUtils.toRaftPeerArray(proto.getPeersList())) - .setLogEntryIndex(index); + .setLogEntryIndex(entry.getIndex()); if (proto.getOldPeersCount() > 0) { b.setOldConf(ProtoUtils.toRaftPeerArray(proto.getOldPeersList())); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index c5a6a98..3f064a8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -89,9 +89,11 @@ public class ServerState implements Closeable { RaftConfiguration initialConf = RaftConfiguration.newBuilder() .setConf(group.getPeers()).build(); configurationManager = new ConfigurationManager(initialConf); + LOG.info("{}: {}", id, configurationManager); final File dir = RaftServerConfigKeys.storageDir(prop); - storage = new RaftStorage(new File(dir, group.getGroupId().toString()), + // use full uuid string to create a subdirectory + storage = new RaftStorage(new File(dir, group.getGroupId().getUuid().toString()), RaftServerConstants.StartupOption.REGULAR); snapshotManager = new SnapshotManager(storage, id); @@ -107,9 +109,7 @@ public class ServerState implements Closeable { // do not know whether the local log entries have been committed. log = initLog(id, prop, lastApplied, entry -> { if (entry.getLogEntryBodyCase() == CONFIGURATIONENTRY) { - configurationManager.addConfiguration(entry.getIndex(), - ServerProtoUtils.toRaftConfiguration(entry.getIndex(), - entry.getConfigurationEntry())); + setRaftConf(entry.getIndex(), ServerProtoUtils.toRaftConfiguration(entry)); } }); @@ -134,8 +134,7 @@ public class ServerState implements Closeable { // get the raft configuration from the snapshot RaftConfiguration raftConf = sm.getRaftConfiguration(); if (raftConf != null) { - configurationManager.addConfiguration(raftConf.getLogEntryIndex(), - raftConf); + setRaftConf(raftConf.getLogEntryIndex(), raftConf); } return snapshot.getIndex(); } @@ -314,10 +313,11 @@ public class ServerState implements Closeable { getRaftConf().getLogEntryIndex(); } - public void setRaftConf(long logIndex, RaftConfiguration conf) { + void setRaftConf(long logIndex, RaftConfiguration conf) { configurationManager.addConfiguration(logIndex, conf); - LOG.info("{}: successfully update the configuration {}", - getSelfId(), conf); + server.getServerRpc().addPeers(conf.getPeers()); + LOG.info("{}: set configuration {} at {}", getSelfId(), conf, logIndex); + LOG.debug("{}: {}", getSelfId(), configurationManager); } void updateConfiguration(LogEntryProto[] entries) { @@ -325,10 +325,7 @@ public class ServerState implements Closeable { configurationManager.removeConfigurations(entries[0].getIndex()); for (LogEntryProto entry : entries) { if (ProtoUtils.isConfigurationLogEntry(entry)) { - final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration( - entry.getIndex(), entry.getConfigurationEntry()); - configurationManager.addConfiguration(entry.getIndex(), conf); - server.getServerRpc().addPeers(conf.getPeers()); + setRaftConf(entry.getIndex(), ServerProtoUtils.toRaftConfiguration(entry)); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java index bcb5823..05208a3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java @@ -108,12 +108,11 @@ public class RaftStorageDirectory { * writing the version file to disk. */ void clearDirectory() throws IOException { - File curDir = this.getCurrentDir(); - clearDirectory(curDir); + clearDirectory(getCurrentDir()); clearDirectory(getStateMachineDir()); } - void clearDirectory(File dir) throws IOException { + private static void clearDirectory(File dir) throws IOException { if (dir.exists()) { LOG.info(dir + " already exists. Deleting it ..."); FileUtils.deleteFully(dir); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 8104938..7a1e83e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -36,6 +36,7 @@ import org.apache.ratis.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -43,6 +44,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -52,7 +54,7 @@ import java.util.stream.StreamSupport; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; -public abstract class MiniRaftCluster { +public abstract class MiniRaftCluster implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName(); @@ -96,10 +98,9 @@ public abstract class MiniRaftCluster { } } - public static int getPort(RaftPeerId id, RaftGroup group) { - final List<RaftPeer> peers = group.getPeers().stream() - .filter(raftPeer -> raftPeer.getId().equals(id)).collect(Collectors.toList()); - final String address = peers.isEmpty() ? null : peers.get(0).getAddress(); + protected int getPort(RaftPeerId id, RaftGroup g) { + final RaftPeer p = g != null? g.getPeer(id): peers.get(id); + final String address = p == null? null : p.getAddress(); final InetSocketAddress inetAddress = address != null? NetUtils.createSocketAddr(address): NetUtils.createLocalServerAddress(); return inetAddress.getPort(); @@ -126,9 +127,12 @@ public abstract class MiniRaftCluster { return new RaftGroup(RaftGroupId.randomId(), peers); } + private final Supplier<File> rootTestDir = JavaUtils.memoize( + () -> new File(BaseTest.getRootTestDir(), + getClass().getSimpleName() + Integer.toHexString(ThreadLocalRandom.current().nextInt()))); + private File getStorageDir(RaftPeerId id) { - return new File(BaseTest.getRootTestDir() - + "/" + getClass().getSimpleName() + "/" + id); + return new File(rootTestDir.get(), id.toString()); } public static String[] generateIds(int numServers, int base) { @@ -143,12 +147,13 @@ public abstract class MiniRaftCluster { protected final RaftProperties properties; protected final Parameters parameters; protected final Map<RaftPeerId, RaftServerProxy> servers = new ConcurrentHashMap<>(); + protected final Map<RaftPeerId, RaftPeer> peers = new ConcurrentHashMap<>(); private final Timer timer; protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) { this.group = initRaftGroup(Arrays.asList(ids)); - LOG.info("new MiniRaftCluster {}", group); + LOG.info("new {} with {}", getClass().getSimpleName(), group); this.properties = new RaftProperties(properties); this.parameters = parameters; @@ -169,20 +174,17 @@ public abstract class MiniRaftCluster { return this; } - private RaftServerProxy putNewServer(RaftPeerId id, boolean format) { - return putNewServer(id, group, format); - } - public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) { final RaftServerProxy s = newRaftServer(id, group, format); Preconditions.assertTrue(servers.put(id, s) == null); + peers.put(id, toRaftPeer(s)); return s; } private Collection<RaftServerProxy> putNewServers( Iterable<RaftPeerId> peers, boolean format) { return StreamSupport.stream(peers.spliterator(), false) - .map(id -> putNewServer(id, format)) + .map(id -> putNewServer(id, group, format)) .collect(Collectors.toList()); } @@ -201,10 +203,14 @@ public abstract class MiniRaftCluster { * start a stopped server again. */ public void restartServer(RaftPeerId newId, boolean format) throws IOException { + restartServer(newId, group, format); + } + + public void restartServer(RaftPeerId newId, RaftGroup group, boolean format) throws IOException { killServer(newId); servers.remove(newId); - putNewServer(newId, format).start(); + putNewServer(newId, group, format).start(); } public void restart(boolean format) throws IOException { @@ -547,6 +553,11 @@ public abstract class MiniRaftCluster { } } + @Override + public void close() { + shutdown(); + } + public void shutdown() { LOG.info("************************************************************** "); LOG.info("*** "); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 8c0def9..8a41d90 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -21,7 +21,6 @@ import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.impl.RaftClientTestUtil; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.NotReplicatedException; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; @@ -40,9 +39,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -51,7 +48,6 @@ import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -65,52 +61,42 @@ import static org.apache.ratis.RaftTestUtil.sendMessageInNewThread; import static org.apache.ratis.RaftTestUtil.waitForLeader; import static org.junit.Assert.assertTrue; -public abstract class RaftBasicTests extends BaseTest { +public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> + extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG); LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - RaftServerConfigKeys.RetryCache.setExpiryTime(properties, TimeDuration + + RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration .valueOf(5, TimeUnit.SECONDS)); } public static final int NUM_SERVERS = 5; - protected static final RaftProperties properties = new RaftProperties(); - - public abstract MiniRaftCluster getCluster(); - - public RaftProperties getProperties() { - return properties; - } - - @Before - public void setup() throws IOException { - Assert.assertNull(getCluster().getLeader()); - getCluster().start(); - } - - @After - public void tearDown() { - final MiniRaftCluster cluster = getCluster(); - if (cluster != null) { - cluster.shutdown(); - } - } - @Test public void testBasicAppendEntries() throws Exception { - runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, false, 10, getCluster(), LOG); + try(CLUSTER cluster = newCluster(NUM_SERVERS)) { + cluster.start(); + runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, false, 10, cluster, LOG); + } } @Test public void testBasicAppendEntriesKillLeader() throws Exception { - runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, true, 10, getCluster(), LOG); + try(CLUSTER cluster = newCluster(NUM_SERVERS)) { + cluster.start(); + runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, true, 10, cluster, LOG); + } } @Test public void testBasicAppendEntriesWithAllReplication() throws Exception { - runTestBasicAppendEntries(false, ReplicationLevel.ALL, false, 10, getCluster(), LOG); + try(CLUSTER cluster = newCluster(NUM_SERVERS)) { + cluster.start(); + runTestBasicAppendEntries(false, ReplicationLevel.ALL, false, 10, cluster, LOG); + } } static void killAndRestartServer(RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) { @@ -188,7 +174,8 @@ public abstract class RaftBasicTests extends BaseTest { @Test public void testOldLeaderCommit() throws Exception { LOG.info("Running testOldLeaderCommit"); - final MiniRaftCluster cluster = getCluster(); + final CLUSTER cluster = newCluster(NUM_SERVERS); + cluster.start(); final RaftServerImpl leader = waitForLeader(cluster); final RaftPeerId leaderId = leader.getId(); final long term = leader.getState().getCurrentTerm(); @@ -224,12 +211,14 @@ public abstract class RaftBasicTests extends BaseTest { cluster.getServerAliveStream().map(s -> s.getState().getLog()) .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages)); LOG.info("terminating testOldLeaderCommit test"); + cluster.shutdown(); } @Test public void testOldLeaderNotCommit() throws Exception { LOG.info("Running testOldLeaderNotCommit"); - final MiniRaftCluster cluster = getCluster(); + final CLUSTER cluster = newCluster(NUM_SERVERS); + cluster.start(); final RaftPeerId leaderId = waitForLeader(cluster).getId(); List<RaftServerImpl> followers = cluster.getFollowers(); @@ -259,6 +248,7 @@ public abstract class RaftBasicTests extends BaseTest { cluster.getServerAliveStream() .map(s -> s.getState().getLog()) .forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate)); + cluster.shutdown(); } static class Client4TestWithLoad extends Thread { @@ -340,7 +330,10 @@ public abstract class RaftBasicTests extends BaseTest { @Test public void testWithLoad() throws Exception { - testWithLoad(10, 500, false, getCluster(), LOG); + try(CLUSTER cluster = newCluster(NUM_SERVERS)) { + cluster.start(); + testWithLoad(10, 500, false, cluster, LOG); + } } public static void testWithLoad(final int numClients, final int numMessages, @@ -456,7 +449,10 @@ public abstract class RaftBasicTests extends BaseTest { @Test public void testDelayRequestIfLeaderStepDown() throws Exception { - runTestDelayRequestIfLeaderStepDown(false, getCluster(), LOG); + try(CLUSTER cluster = newCluster(NUM_SERVERS)) { + cluster.start(); + runTestDelayRequestIfLeaderStepDown(false, cluster, LOG); + } } static void runTestDelayRequestIfLeaderStepDown(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java index 30d9d10..ee338ed 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java @@ -192,7 +192,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster> GroupMismatchException.class); testFailureCase("groupRemove(..) with another group id", - () -> client.groupRemove(anotherGroup.getGroupId(), clusterGroup.getPeers().iterator().next().getId()), + () -> client.groupRemove(anotherGroup.getGroupId(), false, clusterGroup.getPeers().iterator().next().getId()), GroupMismatchException.class); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 8667f50..e167f17 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -188,8 +188,8 @@ public interface RaftTestUtil { if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.SMLOGENTRY) { LOG.info(ServerProtoUtils.toString(e) + ", " + e.getSmLogEntry().toString().trim().replace("\n", ", ")); entries.add(e); - } else if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.NOOP) { - LOG.info("Found " + LogEntryProto.LogEntryBodyCase.NOOP + " at " + ti + } else if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY) { + LOG.info("Found " + LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY + " at " + ti + ", ignoring it."); } else { throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + ti http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java index 5310b94..682f2cb 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java @@ -23,10 +23,12 @@ import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.storage.RaftStorageDirectory; import org.apache.ratis.util.CheckedBiConsumer; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; @@ -35,6 +37,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -62,17 +65,14 @@ public abstract class GroupManagementBaseTest extends BaseTest { } @Test - public void testMultiGroup() throws Exception { + public void testSingleGroupRestart() throws Exception { final MiniRaftCluster cluster = getCluster(0); LOG.info("Start testMultiGroup" + cluster.printServers()); - // Start server with an empty conf - final RaftGroupId groupId = cluster.getGroupId(); - final RaftGroup group = new RaftGroup(groupId); - + // Start server with null group final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0)) .map(RaftPeerId::valueOf).collect(Collectors.toList()); - ids.forEach(id -> cluster.putNewServer(id, group, true)); + ids.forEach(id -> cluster.putNewServer(id, null, true)); LOG.info("putNewServer: " + cluster.printServers()); cluster.start(); @@ -90,6 +90,17 @@ public abstract class GroupManagementBaseTest extends BaseTest { client.groupAdd(newGroup, p.getId()); } Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + TimeUnit.SECONDS.sleep(1); + + // restart the servers with null group + LOG.info("restart servers"); + for(RaftPeer p : newGroup.getPeers()) { + cluster.restartServer(p.getId(), null, false); + } + + // the servers should retrieve the conf from the log. + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + cluster.shutdown(); } @@ -176,9 +187,16 @@ public abstract class GroupManagementBaseTest extends BaseTest { final RaftGroup g = groups[i]; LOG.info(i + ") close " + cluster.printServers(g.getGroupId())); for(RaftPeer p : g.getPeers()) { + final File root = cluster.getServer(p.getId()).getImpl(g.getGroupId()).getState().getStorage().getStorageDir().getRoot(); + Assert.assertTrue(root.exists()); + Assert.assertTrue(root.isDirectory()); + + final RaftClientReply r; try (final RaftClient client = cluster.createClient(p.getId(), g)) { - client.groupRemove(g.getGroupId(), p.getId()); + r = client.groupRemove(g.getGroupId(), true, p.getId()); } + Assert.assertTrue(r.isSuccess()); + Assert.assertFalse(root.exists()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 79017d4..6374a21 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Arrays.asList; import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; -import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.NOOP; public abstract class RaftReconfigurationBaseTest extends BaseTest { static { @@ -168,8 +167,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { final MiniRaftCluster cluster = getCluster(3); cluster.start(); try { - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final RaftClient client = cluster.createClient(leaderId); // submit some msgs before reconf @@ -212,16 +210,19 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { Assert.assertTrue(reconf1.get()); Assert.assertTrue(reconf2.get()); waitAndCheckNewConf(cluster, finalPeers.get(), 2, null); + final RaftPeerId leader2 = RaftTestUtil.waitForLeader(cluster).getId(); // check configuration manager's internal state // each reconf will generate two configurations: (old, new) and (new) - cluster.getServerAliveStream() - .forEach(server -> { + cluster.getServerAliveStream().forEach(server -> { ConfigurationManager confManager = (ConfigurationManager) Whitebox.getInternalState(server.getState(), "configurationManager"); // each reconf will generate two configurations: (old, new) and (new) - Assert.assertEquals(5, confManager.numOfConf()); + // each leader change generates one configuration. + // expectedConf = 1 (init) + 2*2 (two conf changes) + #leader + final int expechedConf = leader2.equals(leaderId)? 6: 7; + Assert.assertEquals(expechedConf, confManager.numOfConf()); }); } finally { cluster.shutdown(); @@ -546,7 +547,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { // find CONFIGURATIONENTRY, there may be NOOP before and after it. final long confIndex = JavaUtils.attempt(() -> { final long last = log.getLastEntryTermIndex().getIndex(); - for (long i = 1; i <= last; i++) { + for (long i = last; i >= 1; i--) { if (log.get(i).getLogEntryBodyCase() == CONFIGURATIONENTRY) { return i; } @@ -571,7 +572,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { // the old leader should have truncated the setConf from the log JavaUtils.attempt(() -> log.getLastCommittedIndex() >= confIndex, 10, 500L, "COMMIT", LOG); - Assert.assertEquals(NOOP, log.get(confIndex).getLogEntryBodyCase()); + Assert.assertEquals(CONFIGURATIONENTRY, log.get(confIndex).getLogEntryBodyCase()); log2 = null; } finally { RaftStorageTestUtils.printLog(log2, s -> LOG.info(s)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/eca35312/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java index b5a35d2..391a6fa 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java @@ -17,30 +17,9 @@ */ package org.apache.ratis.server.simulation; -import org.apache.log4j.Level; import org.apache.ratis.RaftBasicTests; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.util.LogUtils; - -import java.io.IOException; - -public class TestRaftWithSimulatedRpc extends RaftBasicTests { - static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - private final MiniRaftClusterWithSimulatedRpc cluster; - - public TestRaftWithSimulatedRpc() throws IOException { - cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster( - NUM_SERVERS, getProperties()); - } - - @Override - public MiniRaftClusterWithSimulatedRpc getCluster() { - return cluster; - } +public class TestRaftWithSimulatedRpc + extends RaftBasicTests<MiniRaftClusterWithSimulatedRpc> + implements MiniRaftClusterWithSimulatedRpc.FactoryGet { }
