Repository: incubator-ratis Updated Branches: refs/heads/master 291f51b42 -> 06002e67a
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index e9c6d65..9086289 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -31,6 +31,7 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.util.CheckedRunnable; +import org.apache.ratis.util.JavaUtils; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,14 +53,31 @@ public class RaftTestUtil { public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new LogEntryProto[0]; static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class); + + public static RaftServerImpl getImplAsUnchecked(RaftServerProxy proxy) { + return JavaUtils.callAsUnchecked(proxy::getImpl); + } + public static RaftServerImpl waitForLeader(MiniRaftCluster cluster) throws InterruptedException { + return waitForLeader(cluster, false); + } + + public static RaftServerImpl waitForLeader( + MiniRaftCluster cluster, boolean tolerateMultipleLeaders) + throws InterruptedException { final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1; LOG.info(cluster.printServers()); RaftServerImpl leader = null; for(int i = 0; leader == null && i < 10; i++) { Thread.sleep(sleepTime); - leader = cluster.getLeader(); + try { + leader = cluster.getLeader(); + } catch(IllegalStateException e) { + if (!tolerateMultipleLeaders) { + throw e; + } + } } LOG.info(cluster.printServers()); return leader; @@ -116,8 +134,7 @@ public class RaftTestUtil { public static void assertLogEntries(Collection<RaftServerProxy> servers, SimpleMessage... expectedMessages) { final int size = servers.size(); - final long count = servers.stream() - .map(RaftServerProxy::getImpl) + final long count = MiniRaftCluster.getServerStream(servers) .filter(RaftServerImpl::isAlive) .map(s -> s.getState().getLog()) .filter(log -> logEntriesContains(log, expectedMessages)) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 8dd5ae8..20e66e6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -222,7 +222,7 @@ public abstract class RaftReconfigurationBaseTest { // check configuration manager's internal state // each reconf will generate two configurations: (old, new) and (new) - cluster.getServersAliveStream() + cluster.getServerAliveStream() .forEach(server -> { ConfigurationManager confManager = (ConfigurationManager) Whitebox.getInternalState(server.getState(), http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java new file mode 100644 index 0000000..fcb39dd --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.LogUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public abstract class ReinitializationBaseTest { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.TRACE); + } + static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class); + + static final RaftProperties prop = new RaftProperties(); + + public abstract MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory(); + + public MiniRaftCluster getCluster(int peerNum) throws IOException { + return getClusterFactory().newCluster(peerNum, prop); + } + + @Test + public void testReinitialize() throws Exception { + final MiniRaftCluster cluster = getCluster(0); + LOG.info("Start testReinitialize" + cluster.printServers()); + + // Start server with an empty conf + final RaftConfiguration emptyConf = MiniRaftCluster.initConfiguration(Collections.emptyList()); + + final List<RaftPeerId> ids = Arrays.asList(MiniRaftCluster.generateIds(3, 0)) + .stream().map(RaftPeerId::valueOf).collect(Collectors.toList()); + ids.stream().forEach(id -> cluster.putNewServer(id, emptyConf, true)); + LOG.info("putNewServer: " + cluster.printServers()); + + cluster.start(); + LOG.info("start: " + cluster.printServers()); + + // Make sure that there are no leaders. + TimeUnit.SECONDS.sleep(1); + Assert.assertNull(cluster.getLeader()); + + // Reinitialize servers + final RaftPeer[] peers = cluster.getPeers().toArray(RaftPeer.EMPTY_PEERS); + for(RaftPeer p : peers) { + final RaftClient client = cluster.createClient(p.getId(), new ArrayList<>(Arrays.asList(p))); + client.reinitialize(peers, p.getId()); + } + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java index 66004c6..f8a32d9 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java @@ -22,7 +22,6 @@ import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; import java.util.Objects; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index 9551da8..44313dd 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -17,16 +17,15 @@ */ package org.apache.ratis.server.simulation; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +39,7 @@ import java.util.concurrent.TimeUnit; class SimulatedServerRpc implements RaftServerRpc { static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class); - private final RaftServerImpl server; + private final RaftServerProxy server; private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler; private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler; private final ExecutorService executor = Executors.newFixedThreadPool(3, Daemon::new); @@ -48,8 +47,7 @@ class SimulatedServerRpc implements RaftServerRpc { SimulatedServerRpc(RaftServer server, SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) { - this.server = server instanceof RaftServerProxy? - ((RaftServerProxy)server).getImpl(): (RaftServerImpl)server; + this.server = (RaftServerProxy)server; this.serverHandler = new RequestHandler<>(server.getId().toString(), "serverHandler", serverRequestReply, serverHandlerImpl, 3); this.clientHandler = new RequestHandler<>(server.getId().toString(), @@ -122,7 +120,7 @@ class SimulatedServerRpc implements RaftServerRpc { = new RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply>() { @Override public boolean isAlive() { - return server.isAlive(); + return RaftTestUtil.getImplAsUnchecked(server).isAlive(); } @Override @@ -144,14 +142,17 @@ class SimulatedServerRpc implements RaftServerRpc { = new RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply>() { @Override public boolean isAlive() { - return server.isAlive(); + return RaftTestUtil.getImplAsUnchecked(server).isAlive(); } @Override public RaftClientReply handleRequest(RaftClientRequest request) throws IOException { final CompletableFuture<RaftClientReply> future; - if (request instanceof SetConfigurationRequest) { + if (request instanceof ReinitializeRequest) { + future = CompletableFuture.completedFuture( + server.reinitialize((ReinitializeRequest) request)); + } else if (request instanceof SetConfigurationRequest) { future = server.setConfigurationAsync((SetConfigurationRequest) request); } else { future = server.submitClientRequestAsync(request); @@ -159,11 +160,7 @@ class SimulatedServerRpc implements RaftServerRpc { future.whenCompleteAsync((reply, exception) -> { try { - IOException e = null; - if (exception != null) { - e = exception instanceof IOException ? - (IOException) exception : new IOException(exception); - } + final IOException e = IOUtils.asIOException(exception); clientHandler.getRpc().sendReply(request, reply, e); } catch (IOException e) { LOG.warn("Failed to send reply {} for request {} due to exception {}", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java new file mode 100644 index 0000000..7fc0c6c --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.ReinitializationBaseTest; + +public class TestReinitializationWithSimulatedRpc extends ReinitializationBaseTest { + @Override + public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { + return MiniRaftClusterWithSimulatedRpc.FACTORY; + } +}
