http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java b/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java deleted file mode 100644 index 461dd15..0000000 --- a/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java +++ /dev/null @@ -1,305 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft; - -import com.google.common.base.Preconditions; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.raft.protocol.Message; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.impl.BlockRequestHandlingInjection; -import org.apache.raft.server.impl.DelayLocalExecutionInjection; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.shaded.com.google.protobuf.ByteString; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto; -import org.apache.raft.util.CheckedRunnable; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Collection; -import java.util.function.BooleanSupplier; -import java.util.function.IntSupplier; - -import static org.apache.raft.util.ProtoUtils.toByteString; - -public class RaftTestUtil { - static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class); - - public static RaftServerImpl waitForLeader(MiniRaftCluster cluster) - throws InterruptedException { - final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1; - LOG.info(cluster.printServers()); - RaftServerImpl leader = null; - for(int i = 0; leader == null && i < 10; i++) { - Thread.sleep(sleepTime); - leader = cluster.getLeader(); - } - LOG.info(cluster.printServers()); - return leader; - } - - public static RaftServerImpl waitForLeader(MiniRaftCluster cluster, - final String leaderId) throws InterruptedException { - LOG.info(cluster.printServers()); - for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) { - RaftServerImpl currLeader = cluster.getLeader(); - if (LOG.isDebugEnabled()) { - LOG.debug("try enforcing leader to " + leaderId + " but " - + (currLeader == null? "no leader for this round" - : "new leader is " + currLeader.getId())); - } - } - LOG.info(cluster.printServers()); - - final RaftServerImpl leader = cluster.getLeader(); - Assert.assertEquals(leaderId, leader.getId()); - return leader; - } - - public static String waitAndKillLeader(MiniRaftCluster cluster, - boolean expectLeader) throws InterruptedException { - final RaftServerImpl leader = waitForLeader(cluster); - if (!expectLeader) { - Assert.assertNull(leader); - } else { - Assert.assertNotNull(leader); - LOG.info("killing leader = " + leader); - cluster.killServer(leader.getId()); - } - return leader != null ? leader.getId() : null; - } - - public static boolean logEntriesContains(LogEntryProto[] entries, - SimpleMessage... expectedMessages) { - int idxEntries = 0; - int idxExpected = 0; - while (idxEntries < entries.length - && idxExpected < expectedMessages.length) { - if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(), - entries[idxEntries].getSmLogEntry().getData().toByteArray())) { - ++idxExpected; - } - ++idxEntries; - } - return idxExpected == expectedMessages.length; - } - - public static void assertLogEntries(Collection<RaftServerImpl> servers, - SimpleMessage... expectedMessages) { - final int size = servers.size(); - final long count = servers.stream() - .filter(RaftServerImpl::isAlive) - .map(s -> s.getState().getLog().getEntries(0, Long.MAX_VALUE)) - .filter(e -> logEntriesContains(e, expectedMessages)) - .count(); - if (2*count <= size) { - throw new AssertionError("Not in majority: size=" + size - + " but count=" + count); - } - } - - public static void assertLogEntries(LogEntryProto[] entries, long startIndex, - long expertedTerm, SimpleMessage... expectedMessages) { - Assert.assertEquals(expectedMessages.length, entries.length); - for(int i = 0; i < entries.length; i++) { - final LogEntryProto e = entries[i]; - Assert.assertEquals(expertedTerm, e.getTerm()); - Assert.assertEquals(startIndex + i, e.getIndex()); - Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(), - e.getSmLogEntry().getData().toByteArray()); - } - } - - public static class SimpleMessage implements Message { - public static SimpleMessage[] create(int numMessages) { - return create(numMessages, "m"); - } - - public static SimpleMessage[] create(int numMessages, String prefix) { - final SimpleMessage[] messages = new SimpleMessage[numMessages]; - for (int i = 0; i < messages.length; i++) { - messages[i] = new SimpleMessage(prefix + i); - } - return messages; - } - - final String messageId; - - public SimpleMessage(final String messageId) { - this.messageId = messageId; - } - - @Override - public String toString() { - return messageId; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } else if (obj == null || !(obj instanceof SimpleMessage)) { - return false; - } else { - final SimpleMessage that = (SimpleMessage)obj; - return this.messageId.equals(that.messageId); - } - } - - @Override - public int hashCode() { - return messageId.hashCode(); - } - - @Override - public ByteString getContent() { - return toByteString(messageId.getBytes(Charset.forName("UTF-8"))); - } - } - - public static class SimpleOperation { - private final String op; - - public SimpleOperation(String op) { - Preconditions.checkArgument(op != null); - this.op = op; - } - - @Override - public String toString() { - return op; - } - - @Override - public boolean equals(Object obj) { - return obj == this || - (obj instanceof SimpleOperation && - ((SimpleOperation) obj).op.equals(op)); - } - - @Override - public int hashCode() { - return op.hashCode(); - } - - public SMLogEntryProto getLogEntryContent() { - try { - return SMLogEntryProto.newBuilder() - .setData(toByteString(op.getBytes("UTF-8"))).build(); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - } - - public static File getTestDir(Class<?> caller) throws IOException { - File dir = new File(System.getProperty("test.build.data", "target/test/data") - + "/" + RandomStringUtils.randomAlphanumeric(10), - caller.getSimpleName()); - if (dir.exists() && !dir.isDirectory()) { - throw new IOException(dir + " already exists and is not a directory"); - } else if (!dir.exists() && !dir.mkdirs()) { - throw new IOException("Cannot create directory " + dir); - } - return dir; - } - - public static void block(BooleanSupplier isBlocked) throws InterruptedException { - for(; isBlocked.getAsBoolean(); ) { - Thread.sleep(RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); - } - } - - public static void delay(IntSupplier getDelayMs) throws InterruptedException { - final int t = getDelayMs.getAsInt(); - if (t > 0) { - Thread.sleep(t); - } - } - - public static <T extends Throwable> void attempt( - int n, long sleepMs, CheckedRunnable<T> runnable) - throws T, InterruptedException { - for(int i = 1; i <= n; i++) { - LOG.info("Attempt #" + i + "/" + n + ": sleep " + sleepMs + "ms"); - if (sleepMs > 0) { - Thread.sleep(sleepMs); - } - try { - runnable.run(); - return; - } catch (Throwable t) { - if (i == n) { - throw t; - } - LOG.warn("Attempt #" + i + "/" + n + ": Ignoring " + t + " and retry."); - } - } - } - - public static String changeLeader(MiniRaftCluster cluster, String oldLeader) - throws InterruptedException { - cluster.setBlockRequestsFrom(oldLeader, true); - String newLeader = oldLeader; - for(int i = 0; i < 10 && newLeader.equals(oldLeader); i++) { - newLeader = RaftTestUtil.waitForLeader(cluster).getId(); - } - cluster.setBlockRequestsFrom(oldLeader, false); - return newLeader; - } - - public static void blockQueueAndSetDelay(Collection<RaftServerImpl> servers, - DelayLocalExecutionInjection injection, String leaderId, int delayMs, - long maxTimeout) throws InterruptedException { - // block reqeusts sent to leader if delayMs > 0 - final boolean block = delayMs > 0; - LOG.debug("{} requests sent to leader {} and set {}ms delay for the others", - block? "Block": "Unblock", leaderId, delayMs); - if (block) { - BlockRequestHandlingInjection.getInstance().blockReplier(leaderId); - } else { - BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId); - } - - // delay RaftServerRequest for other servers - servers.stream().filter(s -> !s.getId().equals(leaderId)) - .forEach(s -> { - if (block) { - injection.setDelayMs(s.getId(), delayMs); - } else { - injection.removeDelay(s.getId()); - } - }); - - Thread.sleep(3 * maxTimeout); - } - - public static void setBlockRequestsFrom(String src, boolean block) { - if (block) { - BlockRequestHandlingInjection.getInstance().blockRequestor(src); - } else { - BlockRequestHandlingInjection.getInstance().unblockRequestor(src); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java b/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java deleted file mode 100644 index 7f7de9a..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import org.apache.raft.RaftTestUtil; -import org.apache.raft.util.CodeInjectionForTesting; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** Inject code to block a server from handling incoming requests. */ -public class BlockRequestHandlingInjection implements CodeInjectionForTesting.Code { - private static final BlockRequestHandlingInjection INSTANCE = - new BlockRequestHandlingInjection(); - - static { - CodeInjectionForTesting.put(RaftServerImpl.REQUEST_VOTE, INSTANCE); - CodeInjectionForTesting.put(RaftServerImpl.APPEND_ENTRIES, INSTANCE); - CodeInjectionForTesting.put(RaftServerImpl.INSTALL_SNAPSHOT, INSTANCE); - } - - public static BlockRequestHandlingInjection getInstance() { - return INSTANCE; - } - - private final Map<String, Boolean> requestors = new ConcurrentHashMap<>(); - private final Map<String, Boolean> repliers = new ConcurrentHashMap<>(); - - private BlockRequestHandlingInjection() {} - - public void blockRequestor(String requestor) { - requestors.put(requestor, true); - } - - public void unblockRequestor(String requestor) { - requestors.remove(requestor); - } - - public void blockReplier(String replier) { - repliers.put(replier, true); - } - - public void unblockReplier(String replier) { - repliers.remove(replier); - } - - public void unblockAll() { - requestors.clear(); - repliers.clear(); - } - - @Override - public boolean execute(String localId, String remoteId, Object... args) { - if (shouldBlock(localId, remoteId)) { - try { - RaftTestUtil.block(() -> shouldBlock(localId, remoteId)); - return true; - } catch (InterruptedException e) { - LOG.debug("Interrupted while blocking request handling from " + remoteId - + " to " + localId); - } - } - return false; - } - - private boolean shouldBlock(String localId, String remoteId) { - return repliers.containsKey(localId) || requestors.containsKey(remoteId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/impl/DelayLocalExecutionInjection.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/impl/DelayLocalExecutionInjection.java b/raft-server/src/test/java/org/apache/raft/server/impl/DelayLocalExecutionInjection.java deleted file mode 100644 index 26b89d8..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/impl/DelayLocalExecutionInjection.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import org.apache.raft.RaftTestUtil; -import org.apache.raft.util.CodeInjectionForTesting; - -import java.util.Arrays; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -/** Inject code to delay particular servers. */ -public class DelayLocalExecutionInjection implements CodeInjectionForTesting.Code { - private final Map<String, AtomicInteger> delays = new ConcurrentHashMap<>(); - - public DelayLocalExecutionInjection(String method) { - CodeInjectionForTesting.put(method, this); - } - - public void clear() { - delays.clear(); - } - - public void setDelayMs(String id, int delayMs) { - AtomicInteger d = delays.get(id); - if (d == null) { - delays.put(id, d = new AtomicInteger()); - } - d.set(delayMs); - } - - public void removeDelay(String id) { - delays.remove(id); - } - - @Override - public boolean execute(String localId, String remoteId, Object... args) { - final AtomicInteger d = delays.get(localId); - if (d == null) { - return false; - } - LOG.info("{} delay {} ms, args={}", localId, d.get(), - Arrays.toString(args)); - try { - RaftTestUtil.delay(d::get); - } catch (InterruptedException e) { - LOG.debug("Interrupted while delaying " + localId); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java b/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java deleted file mode 100644 index 8a5af69..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java +++ /dev/null @@ -1,577 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import org.apache.log4j.Level; -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.MiniRaftCluster.PeerChanges; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.RaftTestUtil.SimpleMessage; -import org.apache.raft.client.RaftClient; -import org.apache.raft.client.RaftClientRequestSender; -import org.apache.raft.client.impl.RaftClientImpl; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.*; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.simulation.RequestHandler; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.util.RaftUtils; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.internal.util.reflection.Whitebox; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import static java.util.Arrays.asList; -import static org.apache.raft.MiniRaftCluster.logSyncDelay; -import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM; -import static org.apache.raft.server.impl.RaftServerTestUtil.waitAndCheckNewConf; -import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; - -public abstract class RaftReconfigurationBaseTest { - static { - RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - static final Logger LOG = LoggerFactory.getLogger(RaftReconfigurationBaseTest.class); - - protected static final RaftProperties prop = new RaftProperties(); - - @BeforeClass - public static void setup() { - // set a small gap for tests - prop.setInt(RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, 10); - } - - public abstract MiniRaftCluster getCluster(int peerNum) throws IOException; - - private static int getStagingGap() { - return prop.getInt(RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, - RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT); - } - - /** - * add 2 new peers (3 peers -> 5 peers), no leader change - */ - @Test - public void testAddPeers() throws Exception { - LOG.info("Start testAddPeers"); - MiniRaftCluster cluster = getCluster(3); - cluster.start(); - try { - RaftTestUtil.waitForLeader(cluster); - - // add new peers - RaftPeer[] allPeers = cluster.addNewPeers(2, true).allPeersInNewConf; - - // trigger setConfiguration - SetConfigurationRequest request = new SetConfigurationRequest("client", - cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); - LOG.info("Start changing the configuration: {}", request); - cluster.getLeader().setConfiguration(request); - - // wait for the new configuration to take effect - waitAndCheckNewConf(cluster, allPeers, 0, null); - } finally { - cluster.shutdown(); - } - } - - /** - * remove 2 peers (5 peers -> 3 peers), no leader change - */ - @Test - public void testRemovePeers() throws Exception { - LOG.info("Start testRemovePeers"); - MiniRaftCluster cluster = getCluster(5); - cluster.start(); - try { - RaftTestUtil.waitForLeader(cluster); - - // remove peers, leader still included in the new conf - RaftPeer[] allPeers = cluster - .removePeers(2, false, Collections.emptyList()).allPeersInNewConf; - - // trigger setConfiguration - SetConfigurationRequest request = new SetConfigurationRequest("client", - cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); - LOG.info("Start changing the configuration: {}", request); - cluster.getLeader().setConfiguration(request); - - // wait for the new configuration to take effect - waitAndCheckNewConf(cluster, allPeers, 2, null); - } finally { - cluster.shutdown(); - } - } - - /** - * 5 peers -> 5 peers, remove 2 old, add 2 new, no leader change - */ - @Test - public void testAddRemovePeers() throws Exception { - LOG.info("Start testAddRemovePeers"); - testAddRemovePeers(false); - } - - @Test - public void testLeaderStepDown() throws Exception { - LOG.info("Start testLeaderStepDown"); - testAddRemovePeers(true); - } - - private void testAddRemovePeers(boolean leaderStepdown) throws Exception { - MiniRaftCluster cluster = getCluster(5); - cluster.start(); - try { - RaftTestUtil.waitForLeader(cluster); - - PeerChanges change = cluster.addNewPeers(2, true); - RaftPeer[] allPeers = cluster.removePeers(2, leaderStepdown, - asList(change.newPeers)).allPeersInNewConf; - - // trigger setConfiguration - SetConfigurationRequest request = new SetConfigurationRequest("client", - cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers); - LOG.info("Start changing the configuration: {}", request); - cluster.getLeader().setConfiguration(request); - - // wait for the new configuration to take effect - waitAndCheckNewConf(cluster, allPeers, 2, null); - } finally { - cluster.shutdown(); - } - } - - @Test(timeout = 30000) - public void testReconfTwice() throws Exception { - LOG.info("Start testReconfTwice"); - final MiniRaftCluster cluster = getCluster(3); - cluster.start(); - try { - RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); - - // submit some msgs before reconf - for (int i = 0; i < getStagingGap() * 2; i++) { - RaftClientReply reply = client.send(new SimpleMessage("m" + i)); - Assert.assertTrue(reply.isSuccess()); - } - - final AtomicBoolean reconf1 = new AtomicBoolean(false); - final AtomicBoolean reconf2 = new AtomicBoolean(false); - final AtomicReference<RaftPeer[]> finalPeers = new AtomicReference<>(null); - final AtomicReference<RaftPeer[]> deadPeers = new AtomicReference<>(null); - CountDownLatch latch = new CountDownLatch(1); - Thread clientThread = new Thread(() -> { - try { - PeerChanges c1 = cluster.addNewPeers(2, true); - LOG.info("Start changing the configuration: {}", - asList(c1.allPeersInNewConf)); - - RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf); - reconf1.set(reply.isSuccess()); - - PeerChanges c2 = cluster.removePeers(2, true, asList(c1.newPeers)); - finalPeers.set(c2.allPeersInNewConf); - deadPeers.set(c2.removedPeers); - - LOG.info("Start changing the configuration again: {}", - asList(c2.allPeersInNewConf)); - reply = client.setConfiguration(c2.allPeersInNewConf); - reconf2.set(reply.isSuccess()); - - latch.countDown(); - client.close(); - } catch (IOException ignored) { - } - }); - clientThread.start(); - - latch.await(); - Assert.assertTrue(reconf1.get()); - Assert.assertTrue(reconf2.get()); - waitAndCheckNewConf(cluster, finalPeers.get(), 2, null); - - // check configuration manager's internal state - // each reconf will generate two configurations: (old, new) and (new) - cluster.getServers().stream().filter(RaftServerImpl::isAlive) - .forEach(server -> { - ConfigurationManager confManager = - (ConfigurationManager) Whitebox.getInternalState(server.getState(), - "configurationManager"); - // each reconf will generate two configurations: (old, new) and (new) - Assert.assertEquals(5, confManager.numOfConf()); - }); - } finally { - cluster.shutdown(); - } - } - - @Test - public void testReconfTimeout() throws Exception { - LOG.info("Start testReconfTimeout"); - // originally 3 peers - final MiniRaftCluster cluster = getCluster(3); - cluster.start(); - try { - RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); - - PeerChanges c1 = cluster.addNewPeers(2, false); - - LOG.info("Start changing the configuration: {}", - asList(c1.allPeersInNewConf)); - Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional()); - - final RaftClientRequestSender sender = ((RaftClientImpl)client).getRequestSender(); - final SetConfigurationRequest request = new SetConfigurationRequest( - "client", leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf); - try { - sender.sendRequest(request); - Assert.fail("did not get expected exception"); - } catch (IOException e) { - Assert.assertTrue("Got exception " + e, - e instanceof ReconfigurationTimeoutException); - } - - // the two new peers have not started yet, the bootstrapping must timeout - LOG.info(cluster.printServers()); - - // resend the same request, make sure the server has correctly reset its - // state so that we still get timeout instead of in-progress exception - try { - sender.sendRequest(request); - Assert.fail("did not get expected exception"); - } catch (IOException e) { - Assert.assertTrue("Got exception " + e, - e instanceof ReconfigurationTimeoutException); - } - - // start the two new peers - LOG.info("Start new peers"); - for (RaftPeer np : c1.newPeers) { - cluster.startServer(np.getId()); - } - Assert.assertTrue(client.setConfiguration(c1.allPeersInNewConf).isSuccess()); - client.close(); - } finally { - cluster.shutdown(); - } - } - - @Test - public void testBootstrapReconf() throws Exception { - LOG.info("Start testBootstrapReconf"); - // originally 3 peers - final MiniRaftCluster cluster = getCluster(3); - cluster.start(); - try { - RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); - - // submit some msgs before reconf - for (int i = 0; i < getStagingGap() * 2; i++) { - RaftClientReply reply = client.send(new SimpleMessage("m" + i)); - Assert.assertTrue(reply.isSuccess()); - } - - PeerChanges c1 = cluster.addNewPeers(2, true); - LOG.info("Start changing the configuration: {}", - asList(c1.allPeersInNewConf)); - final AtomicReference<Boolean> success = new AtomicReference<>(); - - Thread clientThread = new Thread(() -> { - try { - RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf); - success.set(reply.isSuccess()); - client.close(); - } catch (IOException ioe) { - LOG.error("FAILED", ioe); - } - }); - clientThread.start(); - - Thread.sleep(5000); - LOG.info(cluster.printServers()); - assertSuccess(success); - - final RaftLog leaderLog = cluster.getLeader().getState().getLog(); - for (RaftPeer newPeer : c1.newPeers) { - Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE), - cluster.getServer(newPeer.getId()).getState().getLog() - .getEntries(0, Long.MAX_VALUE)); - } - } finally { - cluster.shutdown(); - } - } - - /** - * kill the leader before reconfiguration finishes. Make sure the client keeps - * retrying. - */ - @Test - public void testKillLeaderDuringReconf() throws Exception { - LOG.info("Start testKillLeaderDuringReconf"); - // originally 3 peers - final MiniRaftCluster cluster = getCluster(3); - cluster.start(); - try { - RaftTestUtil.waitForLeader(cluster); - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); - - PeerChanges c1 = cluster.addNewPeers(2, false); - PeerChanges c2 = cluster.removePeers(2, false, asList(c1.newPeers)); - - LOG.info("Start changing the configuration: {}", - asList(c2.allPeersInNewConf)); - final AtomicReference<Boolean> success = new AtomicReference<>(); - final AtomicBoolean clientRunning = new AtomicBoolean(true); - Thread clientThread = new Thread(() -> { - try { - boolean r = false; - while (clientRunning.get() && !r) { - r = client.setConfiguration(c2.allPeersInNewConf).isSuccess(); - } - success.set(r); - client.close(); - } catch (IOException ignored) { - } - }); - clientThread.start(); - - // the leader cannot generate the (old, new) conf, and it will keep - // bootstrapping the 2 new peers since they have not started yet - LOG.info(cluster.printServers()); - Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional()); - - // only the first empty entry got committed - final long committedIndex = cluster.getLeader().getState().getLog() - .getLastCommittedIndex(); - Assert.assertTrue("committedIndex is " + committedIndex, - committedIndex <= 1); - - LOG.info("kill the current leader"); - final String oldLeaderId = RaftTestUtil.waitAndKillLeader(cluster, true); - LOG.info("start the two new peers: {}", Arrays.asList(c1.newPeers)); - for (RaftPeer np : c1.newPeers) { - cluster.startServer(np.getId()); - } - - Thread.sleep(3000); - // the client should get the NotLeaderException from the first leader, and - // will retry the same setConfiguration request - waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 2, - Collections.singletonList(oldLeaderId)); - clientRunning.set(false); - //Assert.assertTrue(success.get()); - } finally { - cluster.shutdown(); - } - } - - static void assertSuccess(final AtomicReference<Boolean> success) { - final String s = "success=" + success; - Assert.assertNotNull(s, success.get()); - Assert.assertTrue(s, success.get()); - } - - /** - * When a request's new configuration is the same with the current one, make - * sure we return success immediately and no log entry is recorded. - */ - @Test - public void testNoChangeRequest() throws Exception { - LOG.info("Start testNoChangeRequest"); - // originally 3 peers - final MiniRaftCluster cluster = getCluster(3); - try { - cluster.start(); - RaftTestUtil.waitForLeader(cluster); - - final String leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient("client", leaderId); - client.send(new SimpleMessage("m")); - - final long committedIndex = cluster.getLeader().getState().getLog() - .getLastCommittedIndex(); - final RaftConfiguration confBefore = cluster.getLeader().getRaftConf(); - - // no real configuration change in the request - RaftClientReply reply = client.setConfiguration(cluster.getPeers() - .toArray(new RaftPeer[0])); - Assert.assertTrue(reply.isSuccess()); - Assert.assertEquals(committedIndex, cluster.getLeader().getState() - .getLog().getLastCommittedIndex()); - Assert.assertSame(confBefore, cluster.getLeader().getRaftConf()); - client.close(); - } finally { - cluster.shutdown(); - } - } - - /** - * Make sure a setConfiguration request is rejected if a configuration change - * is still in progress (i.e., has not been committed yet). - */ - @Test - public void testOverlappedSetConfRequests() throws Exception { - LOG.info("Start testOverlappedSetConfRequests"); - // originally 3 peers - final MiniRaftCluster cluster = getCluster(3); - try { - cluster.start(); - RaftTestUtil.waitForLeader(cluster); - - final String leaderId = cluster.getLeader().getId(); - - RaftPeer[] newPeers = cluster.addNewPeers(2, true).allPeersInNewConf; - - // delay every peer's logSync so that the setConf request is delayed - cluster.getPeers() - .forEach(peer -> logSyncDelay.setDelayMs(peer.getId(), 1000)); - - final CountDownLatch latch = new CountDownLatch(1); - final RaftPeer[] peersInRequest2 = cluster.getPeers().toArray(new RaftPeer[0]); - AtomicBoolean caughtException = new AtomicBoolean(false); - new Thread(() -> { - try(final RaftClient client2 = cluster.createClient("client2", leaderId)) { - latch.await(); - LOG.info("client2 starts to change conf"); - final RaftClientRequestSender sender2 = ((RaftClientImpl)client2).getRequestSender(); - sender2.sendRequest(new SetConfigurationRequest( - "client2", leaderId, DEFAULT_SEQNUM, peersInRequest2)); - } catch (ReconfigurationInProgressException e) { - caughtException.set(true); - } catch (Exception e) { - LOG.warn("Got unexpected exception when client2 changes conf", e); - } - }).start(); - - AtomicBoolean confChanged = new AtomicBoolean(false); - new Thread(() -> { - try(final RaftClient client1 = cluster.createClient("client1", leaderId)) { - LOG.info("client1 starts to change conf"); - confChanged.set(client1.setConfiguration(newPeers).isSuccess()); - } catch (IOException e) { - LOG.warn("Got unexpected exception when client1 changes conf", e); - } - }).start(); - Thread.sleep(100); - latch.countDown(); - - for (int i = 0; i < 10 && !confChanged.get(); i++) { - Thread.sleep(1000); - } - Assert.assertTrue(confChanged.get()); - Assert.assertTrue(caughtException.get()); - } finally { - logSyncDelay.clear(); - cluster.shutdown(); - } - } - - /** - * Test a scenario where the follower truncates its log entries which causes - * configuration change. - */ - @Test - public void testRevertConfigurationChange() throws Exception { - LOG.info("Start testRevertConfigurationChange"); - // originally 3 peers - final MiniRaftCluster cluster = getCluster(5); - try { - cluster.start(); - RaftTestUtil.waitForLeader(cluster); - - final String leaderId = cluster.getLeader().getId(); - - final RaftLog log = cluster.getServer(leaderId).getState().getLog(); - Thread.sleep(1000); - Assert.assertEquals(0, log.getLatestFlushedIndex()); - - // we block the incoming msg for the leader and block its requests to - // followers, so that we force the leader change and the old leader will - // not know - LOG.info("start blocking the leader"); - BlockRequestHandlingInjection.getInstance().blockReplier(leaderId); - cluster.setBlockRequestsFrom(leaderId, true); - - PeerChanges change = cluster.removePeers(1, false, new ArrayList<>()); - - AtomicBoolean gotNotLeader = new AtomicBoolean(false); - new Thread(() -> { - try(final RaftClient client = cluster.createClient("client1", leaderId)) { - LOG.info("client starts to change conf"); - final RaftClientRequestSender sender = ((RaftClientImpl)client).getRequestSender(); - RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest( - "client", leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf)); - if (reply.isNotLeader()) { - gotNotLeader.set(true); - } - } catch (IOException e) { - LOG.warn("Got unexpected exception when client1 changes conf", e); - } - }).start(); - - // wait till the old leader persist the new conf - for (int i = 0; i < 10 && log.getLatestFlushedIndex() < 1; i++) { - Thread.sleep(500); - } - Assert.assertEquals(1, log.getLatestFlushedIndex()); - Assert.assertEquals(CONFIGURATIONENTRY, - log.getLastEntry().getLogEntryBodyCase()); - - // unblock the old leader - BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId); - cluster.setBlockRequestsFrom(leaderId, false); - - // the client should get NotLeaderException - for (int i = 0; i < 10 && !gotNotLeader.get(); i++) { - Thread.sleep(500); - } - Assert.assertTrue(gotNotLeader.get()); - - // the old leader should have truncated the setConf from the log - boolean newState = false; - for (int i = 0; i < 10 && !newState; i++) { - Thread.sleep(500); - newState = log.getLastCommittedIndex() == 1 && - log.getLastEntry().getLogEntryBodyCase() != CONFIGURATIONENTRY; - } - Assert.assertTrue(newState); - } finally { - cluster.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java b/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java deleted file mode 100644 index bd1934f..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.RaftServer; -import org.apache.raft.statemachine.StateMachine; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; - -public class RaftServerTestUtil { - static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class); - - public static void waitAndCheckNewConf(MiniRaftCluster cluster, - RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers) - throws Exception { - final long sleepMs = cluster.getMaxTimeout() * (numOfRemovedPeers + 2); - RaftTestUtil.attempt(3, sleepMs, - () -> waitAndCheckNewConf(cluster, peers, deadPeers)); - } - private static void waitAndCheckNewConf(MiniRaftCluster cluster, - RaftPeer[] peers, Collection<String> deadPeers) - throws Exception { - LOG.info(cluster.printServers()); - Assert.assertNotNull(cluster.getLeader()); - - int numIncluded = 0; - int deadIncluded = 0; - final RaftConfiguration current = RaftConfiguration.newBuilder() - .setConf(peers).setLogEntryIndex(0).build(); - for (RaftServerImpl server : cluster.getServers()) { - if (deadPeers != null && deadPeers.contains(server.getId())) { - if (current.containsInConf(server.getId())) { - deadIncluded++; - } - continue; - } - if (current.containsInConf(server.getId())) { - numIncluded++; - Assert.assertTrue(server.getRaftConf().isStable()); - Assert.assertTrue(server.getRaftConf().hasNoChange(peers)); - } else { - Assert.assertFalse(server.getId() + " is still running: " + server, - server.isAlive()); - } - } - Assert.assertEquals(peers.length, numIncluded + deadIncluded); - } - - public static StateMachine getStateMachine(RaftServerImpl s) { - return s.getStateMachine(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java deleted file mode 100644 index 7414872..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.client.RaftClientRequestSender; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.RaftServerImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; - -public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { - static final Logger LOG = LoggerFactory.getLogger(MiniRaftClusterWithSimulatedRpc.class); - - public static final Factory<MiniRaftClusterWithSimulatedRpc> FACTORY - = new Factory<MiniRaftClusterWithSimulatedRpc>() { - @Override - public MiniRaftClusterWithSimulatedRpc newCluster( - String[] ids, RaftProperties prop, boolean formatted) { - prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0); - return new MiniRaftClusterWithSimulatedRpc(ids, prop, formatted); - } - }; - - private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; - private SimulatedClientRequestReply client2serverRequestReply; - - public MiniRaftClusterWithSimulatedRpc(int numServers, - RaftProperties properties) { - this(generateIds(numServers, 0), properties, true); - } - - public MiniRaftClusterWithSimulatedRpc(String[] ids, - RaftProperties properties, boolean formatted) { - super(ids, properties, formatted); - initRpc(); - } - - private void initRpc() { - final Collection<RaftPeer> peers = getConf().getPeers(); - final int simulateLatencyMs = properties.getInt( - SimulatedRequestReply.SIMULATE_LATENCY_KEY, - SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT); - LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = " - + simulateLatencyMs); - serverRequestReply = new SimulatedRequestReply<>(peers, simulateLatencyMs); - client2serverRequestReply = new SimulatedClientRequestReply(peers, - simulateLatencyMs); - - setRpcServers(getServers()); - } - - private void setRpcServers(Collection<RaftServerImpl> newServers) { - newServers.forEach(s -> s.setServerRpc( - new SimulatedServerRpc(s, serverRequestReply, client2serverRequestReply))); - } - - @Override - protected void setPeerRpc() { - initRpc(); - } - - private void addPeersToRpc(Collection<RaftPeer> peers) { - serverRequestReply.addPeers(peers); - client2serverRequestReply.addPeers(peers); - } - - @Override - public void restartServer(String id, boolean format) throws IOException { - super.restartServer(id, format); - RaftServerImpl s = getServer(id); - addPeersToRpc(Collections.singletonList(conf.getPeer(id))); - s.setServerRpc(new SimulatedServerRpc(s, serverRequestReply, - client2serverRequestReply)); - s.start(); - } - - @Override - public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, - Collection<RaftServerImpl> newServers, boolean startService) { - addPeersToRpc(newPeers); - setRpcServers(newServers); - if (startService) { - newServers.forEach(RaftServerImpl::start); - } - return newPeers; - } - - @Override - public RaftClientRequestSender getRaftClientRequestSender() { - return client2serverRequestReply; - } - - @Override - public void blockQueueAndSetDelay(String leaderId, int delayMs) - throws InterruptedException { - // block leader sendRequest if delayMs > 0 - final boolean block = delayMs > 0; - LOG.debug("{} leader queue {} and set {}ms delay for the other queues", - block? "Block": "Unblock", leaderId, delayMs); - serverRequestReply.getQueue(leaderId).blockSendRequestTo.set(block); - - // set delay takeRequest for the other queues - getServers().stream().filter(s -> !s.getId().equals(leaderId)) - .map(s -> serverRequestReply.getQueue(s.getId())) - .forEach(q -> q.delayTakeRequestTo.set(delayMs)); - - final long sleepMs = 3 * getMaxTimeout() / 2; - Thread.sleep(sleepMs); - } - - @Override - public void setBlockRequestsFrom(String src, boolean block) { - serverRequestReply.getQueue(src).blockTakeRequestFrom.set(block); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/RaftServerReply.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/RaftServerReply.java b/raft-server/src/test/java/org/apache/raft/server/simulation/RaftServerReply.java deleted file mode 100644 index 7a03d75..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/RaftServerReply.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import com.google.common.base.Preconditions; -import org.apache.raft.protocol.RaftRpcMessage; -import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.RequestVoteReplyProto; - -public class RaftServerReply extends RaftRpcMessage { - private final AppendEntriesReplyProto appendEntries; - private final RequestVoteReplyProto requestVote; - private final InstallSnapshotReplyProto installSnapshot; - - RaftServerReply(AppendEntriesReplyProto a) { - appendEntries = Preconditions.checkNotNull(a); - requestVote = null; - installSnapshot = null; - } - - RaftServerReply(RequestVoteReplyProto r) { - appendEntries = null; - requestVote = Preconditions.checkNotNull(r); - installSnapshot = null; - } - - RaftServerReply(InstallSnapshotReplyProto i) { - appendEntries = null; - requestVote = null; - installSnapshot = Preconditions.checkNotNull(i); - } - - boolean isAppendEntries() { - return appendEntries != null; - } - - boolean isRequestVote() { - return requestVote != null; - } - - boolean isInstallSnapshot() { - return installSnapshot != null; - } - - AppendEntriesReplyProto getAppendEntries() { - return appendEntries; - } - - RequestVoteReplyProto getRequestVote() { - return requestVote; - } - - InstallSnapshotReplyProto getInstallSnapshot() { - return installSnapshot; - } - - @Override - public boolean isRequest() { - return false; - } - - @Override - public String getRequestorId() { - if (isAppendEntries()) { - return appendEntries.getServerReply().getRequestorId(); - } else if (isRequestVote()) { - return requestVote.getServerReply().getRequestorId(); - } else { - return installSnapshot.getServerReply().getRequestorId(); - } - } - - @Override - public String getReplierId() { - if (isAppendEntries()) { - return appendEntries.getServerReply().getReplyId(); - } else if (isRequestVote()) { - return requestVote.getServerReply().getReplyId(); - } else { - return installSnapshot.getServerReply().getReplyId(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/RaftServerRequest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/RaftServerRequest.java b/raft-server/src/test/java/org/apache/raft/server/simulation/RaftServerRequest.java deleted file mode 100644 index 499b12d..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/RaftServerRequest.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import org.apache.raft.protocol.RaftRpcMessage; -import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.raft.shaded.proto.RaftProtos.RequestVoteRequestProto; - -class RaftServerRequest extends RaftRpcMessage { - private final AppendEntriesRequestProto appendEntries; - private final RequestVoteRequestProto requestVote; - private final InstallSnapshotRequestProto installSnapshot; - - RaftServerRequest(AppendEntriesRequestProto a) { - appendEntries = a; - requestVote = null; - installSnapshot = null; - } - - RaftServerRequest(RequestVoteRequestProto r) { - appendEntries = null; - requestVote = r; - installSnapshot = null; - } - - RaftServerRequest(InstallSnapshotRequestProto i) { - appendEntries = null; - requestVote = null; - installSnapshot = i; - } - - boolean isAppendEntries() { - return appendEntries != null; - } - - boolean isRequestVote() { - return requestVote != null; - } - - boolean isInstallSnapshot() { - return installSnapshot != null; - } - - AppendEntriesRequestProto getAppendEntries() { - return appendEntries; - } - - RequestVoteRequestProto getRequestVote() { - return requestVote; - } - - InstallSnapshotRequestProto getInstallSnapshot() { - return installSnapshot; - } - - @Override - public boolean isRequest() { - return true; - } - - @Override - public String getRequestorId() { - if (isAppendEntries()) { - return appendEntries.getServerRequest().getRequestorId(); - } else if (isRequestVote()) { - return requestVote.getServerRequest().getRequestorId(); - } else { - return installSnapshot.getServerRequest().getRequestorId(); - } - } - - @Override - public String getReplierId() { - if (isAppendEntries()) { - return appendEntries.getServerRequest().getReplyId(); - } else if (isRequestVote()) { - return requestVote.getServerRequest().getReplyId(); - } else { - return installSnapshot.getServerRequest().getReplyId(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/RequestHandler.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/RequestHandler.java b/raft-server/src/test/java/org/apache/raft/server/simulation/RequestHandler.java deleted file mode 100644 index dee13a6..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/RequestHandler.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import org.apache.raft.protocol.RaftRpcMessage; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.ExitUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.List; - -public class RequestHandler<REQUEST extends RaftRpcMessage, - REPLY extends RaftRpcMessage> { - public static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class); - - interface HandlerInterface<REQUEST extends RaftRpcMessage, - REPLY extends RaftRpcMessage> { - - boolean isAlive(); - - REPLY handleRequest(REQUEST r) throws IOException; - } - - private final String serverId; - private final String name; - private final SimulatedRequestReply<REQUEST, REPLY> rpc; - private final HandlerInterface<REQUEST, REPLY> handlerImpl; - private final List<HandlerDaemon> daemons; - - RequestHandler(String serverId, String name, - SimulatedRequestReply<REQUEST, REPLY> rpc, - HandlerInterface<REQUEST, REPLY> handlerImpl, - int numHandlers) { - this.serverId = serverId; - this.name = name; - this.rpc = rpc; - this.handlerImpl = handlerImpl; - - this.daemons = new ArrayList<>(numHandlers); - for(int i = 0; i < numHandlers; i++) { - daemons.add(new HandlerDaemon(i)); - } - } - - void startDaemon() { - daemons.forEach(Thread::start); - } - - void shutdown() { - rpc.shutdown(serverId); - } - - void interruptAndJoinDaemon() throws InterruptedException { - daemons.forEach(Thread::interrupt); - for (Daemon d : daemons) { - d.join(); - } - } - - SimulatedRequestReply<REQUEST, REPLY> getRpc() { - return rpc; - } - - void handleRequest(REQUEST request) throws IOException { - final REPLY reply; - try { - reply = handlerImpl.handleRequest(request); - } catch (IOException ioe) { - LOG.debug("IOException for " + request, ioe); - rpc.sendReply(request, null, ioe); - return; - } - if (reply != null) { - rpc.sendReply(request, reply, null); - } - } - - /** - * A thread keep polling requests from the request queue. Used for simulation. - */ - class HandlerDaemon extends Daemon { - private final int id; - - HandlerDaemon(int id) { - this.id = id; - } - - @Override - public String toString() { - return serverId + "." + name + id; - } - - @Override - public void run() { - while (handlerImpl.isAlive()) { - try { - handleRequest(rpc.takeRequest(serverId)); - } catch (InterruptedIOException e) { - LOG.info(this + " is interrupted by " + e); - LOG.trace("TRACE", e); - break; - } catch (IOException e) { - LOG.error(this + " has " + e); - LOG.trace("TRACE", e); - } catch(Throwable t) { - if (!handlerImpl.isAlive()) { - LOG.info(this + " is stopped."); - break; - } - ExitUtils.terminate(1, this + " is terminating.", t, LOG); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedClientRequestReply.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedClientRequestReply.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedClientRequestReply.java deleted file mode 100644 index 828e2a2..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedClientRequestReply.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import org.apache.raft.client.RaftClientRequestSender; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.protocol.RaftPeer; - -import java.io.IOException; -import java.util.Collection; - -public class SimulatedClientRequestReply - extends SimulatedRequestReply<RaftClientRequest, RaftClientReply> - implements RaftClientRequestSender { - SimulatedClientRequestReply(Collection<RaftPeer> allPeers, - int simulateLatencyMs) { - super(allPeers, simulateLatencyMs); - } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - // do nothing - } - - @Override - public void close() { - // do nothing - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedRequestReply.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedRequestReply.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedRequestReply.java deleted file mode 100644 index 100fd60..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedRequestReply.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import com.google.common.base.Preconditions; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.protocol.RaftRpcMessage; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.util.RaftUtils; -import org.apache.raft.util.Timestamp; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -public class SimulatedRequestReply<REQUEST extends RaftRpcMessage, - REPLY extends RaftRpcMessage> { - public static final String SIMULATE_LATENCY_KEY - = SimulatedRequestReply.class.getName() + ".simulateLatencyMs"; - public static final int SIMULATE_LATENCY_DEFAULT - = RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT; - public static final long TIMEOUT = 3000L; - - private static class ReplyOrException<REPLY> { - private final REPLY reply; - private final IOException ioe; - - ReplyOrException(REPLY reply, IOException ioe) { - Preconditions.checkArgument(reply == null ^ ioe == null); - this.reply = reply; - this.ioe = ioe; - } - } - - static class EventQueue<REQUEST, REPLY> { - private final BlockingQueue<REQUEST> requestQueue - = new LinkedBlockingQueue<>(); - private final Map<REQUEST, ReplyOrException<REPLY>> replyMap - = new ConcurrentHashMap<>(); - - /** Block takeRequest for the requests sent from this server. */ - final AtomicBoolean blockTakeRequestFrom = new AtomicBoolean(); - /** Block sendRequest for the requests sent to this server. */ - final AtomicBoolean blockSendRequestTo = new AtomicBoolean(); - /** Delay takeRequest for the requests sent to this server. */ - final AtomicInteger delayTakeRequestTo = new AtomicInteger(); - /** Delay takeRequest for the requests sent from this server. */ - final AtomicInteger delayTakeRequestFrom = new AtomicInteger(); - - REPLY request(REQUEST request) throws InterruptedException, IOException { - requestQueue.put(request); - synchronized (this) { - final Timestamp startTime = new Timestamp(); - while (startTime.elapsedTimeMs() < TIMEOUT && - !replyMap.containsKey(request)) { - this.wait(TIMEOUT); // no need to be precise here - } - } - - if (!replyMap.containsKey(request)) { - throw new IOException("Timeout while waiting for reply of request " - + request); - } - final ReplyOrException<REPLY> re = replyMap.remove(request); - if (re.ioe != null) { - throw re.ioe; - } - return re.reply; - } - - REQUEST takeRequest() throws InterruptedException { - return requestQueue.take(); - } - - void reply(REQUEST request, REPLY reply, IOException ioe) - throws IOException { - replyMap.put(request, new ReplyOrException<>(reply, ioe)); - synchronized (this) { - this.notifyAll(); - } - } - } - - private final Map<String, EventQueue<REQUEST, REPLY>> queues; - private final int simulateLatencyMs; - - SimulatedRequestReply(Collection<RaftPeer> allPeers, int simulateLatencyMs) { - queues = new ConcurrentHashMap<>(); - for (RaftPeer peer : allPeers) { - queues.put(peer.getId(), new EventQueue<>()); - } - - this.simulateLatencyMs = simulateLatencyMs; - } - - EventQueue<REQUEST, REPLY> getQueue(String qid) { - return queues.get(qid); - } - - public REPLY sendRequest(REQUEST request) throws IOException { - final String qid = request.getReplierId(); - final EventQueue<REQUEST, REPLY> q = queues.get(qid); - if (q == null) { - throw new IOException("The peer " + qid + " is not alive."); - } - try { - RaftTestUtil.block(q.blockSendRequestTo::get); - return q.request(request); - } catch (InterruptedException e) { - throw RaftUtils.toInterruptedIOException("", e); - } - } - - public REQUEST takeRequest(String qid) throws IOException { - final EventQueue<REQUEST, REPLY> q = queues.get(qid); - if (q == null) { - throw new IOException("The RPC of " + qid + " has already shutdown."); - } - - final REQUEST request; - try { - // delay request for testing - RaftTestUtil.delay(q.delayTakeRequestTo::get); - - request = q.takeRequest(); - Preconditions.checkState(qid.equals(request.getReplierId())); - - // block request for testing - final EventQueue<REQUEST, REPLY> reqQ = queues.get(request.getRequestorId()); - if (reqQ != null) { - RaftTestUtil.delay(reqQ.delayTakeRequestFrom::get); - RaftTestUtil.block(reqQ.blockTakeRequestFrom::get); - } - } catch (InterruptedException e) { - throw RaftUtils.toInterruptedIOException("", e); - } - return request; - } - - public void sendReply(REQUEST request, REPLY reply, IOException ioe) - throws IOException { - if (reply != null) { - Preconditions.checkArgument( - request.getRequestorId().equals(reply.getRequestorId())); - Preconditions.checkArgument( - request.getReplierId().equals(reply.getReplierId())); - } - simulateLatency(); - final String qid = request.getReplierId(); - EventQueue<REQUEST, REPLY> q = queues.get(qid); - if (q != null) { - q.reply(request, reply, ioe); - } - } - - public void shutdown(String id) { - queues.remove(id); - } - - public void addPeers(Collection<RaftPeer> newPeers) { - for (RaftPeer peer : newPeers) { - queues.put(peer.getId(), new EventQueue<>()); - } - } - - private void simulateLatency() throws IOException { - if (simulateLatencyMs > 0) { - int waitExpetation = simulateLatencyMs / 10; - int waitHalfRange = waitExpetation / 3; - int randomSleepMs = ThreadLocalRandom.current().nextInt(2 * waitHalfRange) - + waitExpetation - waitHalfRange; - try { - Thread.sleep(randomSleepMs); - } catch (InterruptedException ie) { - throw RaftUtils.toInterruptedIOException("", ie); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java deleted file mode 100644 index 799ee65..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.raft.protocol.*; -import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.protocol.RaftServerProtocol; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -class SimulatedServerRpc implements RaftServerRpc { - static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class); - - private final RaftServerImpl server; - private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler; - private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler; - private final ExecutorService executor = Executors.newFixedThreadPool(3, - new ThreadFactoryBuilder().setDaemon(true).build()); - - SimulatedServerRpc(RaftServerImpl server, - SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, - SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) { - this.server = server; - this.serverHandler = new RequestHandler<>(server.getId(), - "serverHandler", serverRequestReply, serverHandlerImpl, 3); - this.clientHandler = new RequestHandler<>(server.getId(), - "clientHandler", clientRequestReply, clientHandlerImpl, 3); - } - - @Override - public void start() { - serverHandler.startDaemon(); - clientHandler.startDaemon(); - } - - private void interruptAndJoin() throws InterruptedException { - clientHandler.interruptAndJoinDaemon(); - serverHandler.interruptAndJoinDaemon(); - } - - @Override - public void close() { - try { - interruptAndJoin(); - executor.shutdown(); - executor.awaitTermination(1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - } - clientHandler.shutdown(); - serverHandler.shutdown(); - } - - @Override - public InetSocketAddress getInetSocketAddress() { - return null; - } - - @Override - public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) - throws IOException { - RaftServerReply reply = serverHandler.getRpc() - .sendRequest(new RaftServerRequest(request)); - return reply.getAppendEntries(); - } - - @Override - public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) - throws IOException { - RaftServerReply reply = serverHandler.getRpc() - .sendRequest(new RaftServerRequest(request)); - return reply.getInstallSnapshot(); - } - - @Override - public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) - throws IOException { - RaftServerReply reply = serverHandler.getRpc() - .sendRequest(new RaftServerRequest(request)); - return reply.getRequestVote(); - } - - @Override - public void addPeers(Iterable<RaftPeer> peers) { - // do nothing - } - - final RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply> serverHandlerImpl - = new RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply>() { - @Override - public boolean isAlive() { - return server.isAlive(); - } - - @Override - public RaftServerReply handleRequest(RaftServerRequest r) - throws IOException { - if (r.isAppendEntries()) { - return new RaftServerReply(server.appendEntries(r.getAppendEntries())); - } else if (r.isRequestVote()) { - return new RaftServerReply(server.requestVote(r.getRequestVote())); - } else if (r.isInstallSnapshot()) { - return new RaftServerReply(server.installSnapshot(r.getInstallSnapshot())); - } else { - throw new IllegalStateException("unexpected state"); - } - } - }; - - final RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply> clientHandlerImpl - = new RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply>() { - @Override - public boolean isAlive() { - return server.isAlive(); - } - - @Override - public RaftClientReply handleRequest(RaftClientRequest request) - throws IOException { - final CompletableFuture<RaftClientReply> future; - if (request instanceof SetConfigurationRequest) { - future = server.setConfigurationAsync((SetConfigurationRequest) request); - } else { - future = server.submitClientRequestAsync(request); - } - - future.whenCompleteAsync((reply, exception) -> { - try { - IOException e = null; - if (exception != null) { - e = exception instanceof IOException ? - (IOException) exception : new IOException(exception); - } - clientHandler.getRpc().sendReply(request, reply, e); - } catch (IOException e) { - LOG.warn("Failed to send reply {} for request {} due to exception {}", - reply, request, e); - } - }, executor); - return null; - } - }; -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/TestNotLeaderExceptionWithSimulation.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/TestNotLeaderExceptionWithSimulation.java b/raft-server/src/test/java/org/apache/raft/server/simulation/TestNotLeaderExceptionWithSimulation.java deleted file mode 100644 index d6f6acb..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/TestNotLeaderExceptionWithSimulation.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.RaftNotLeaderExceptionBaseTest; -import org.apache.raft.conf.RaftProperties; - -import java.io.IOException; - -public class TestNotLeaderExceptionWithSimulation extends RaftNotLeaderExceptionBaseTest { - @Override - public MiniRaftCluster initCluster() throws IOException { - String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0); - return new MiniRaftClusterWithSimulatedRpc(s, new RaftProperties(), true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java deleted file mode 100644 index b0eb456..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.server.impl.RaftReconfigurationBaseTest; - -import java.io.IOException; - -public class TestRaftReconfigurationWithSimulatedRpc - extends RaftReconfigurationBaseTest { - @Override - public MiniRaftCluster getCluster(int peerNum) throws IOException { - return new MiniRaftClusterWithSimulatedRpc(peerNum, prop); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftSnapshotWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftSnapshotWithSimulatedRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftSnapshotWithSimulatedRpc.java deleted file mode 100644 index 7c8f873..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftSnapshotWithSimulatedRpc.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.statemachine.RaftSnapshotBaseTest; - -import java.io.IOException; - -public class TestRaftSnapshotWithSimulatedRpc extends RaftSnapshotBaseTest { - @Override - public MiniRaftCluster initCluster(int numServer, RaftProperties prop) - throws IOException { - return MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(numServer, prop, true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java deleted file mode 100644 index faa9dd8..0000000 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.simulation; - -import org.apache.log4j.Level; -import org.apache.raft.RaftBasicTests; -import org.apache.raft.client.RaftClient; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.util.RaftUtils; - -import java.io.IOException; -import java.util.concurrent.ThreadLocalRandom; - -public class TestRaftWithSimulatedRpc extends RaftBasicTests { - static { - RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - private final MiniRaftClusterWithSimulatedRpc cluster; - - public TestRaftWithSimulatedRpc() throws IOException { - final RaftProperties properties = getProperties(); - if (ThreadLocalRandom.current().nextBoolean()) { - // turn off simulate latency half of the times. - properties.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0); - } - cluster = new MiniRaftClusterWithSimulatedRpc(NUM_SERVERS, properties); - } - - @Override - public MiniRaftClusterWithSimulatedRpc getCluster() { - return cluster; - } -}