Repository: incubator-ratis Updated Branches: refs/heads/master b7d089a1a -> 42273af45
RATIS-117. Add test for situation when old leader can/cannot commit log. Contributed by Yubo Xu. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/42273af4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/42273af4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/42273af4 Branch: refs/heads/master Commit: 42273af454d368293e467518431c16f7c3c248aa Parents: b7d089a Author: Jing Zhao <[email protected]> Authored: Thu Nov 9 13:15:26 2017 -0800 Committer: Jing Zhao <[email protected]> Committed: Thu Nov 9 13:15:26 2017 -0800 ---------------------------------------------------------------------- .../ratis/server/storage/LogOutputStream.java | 25 ++++-- .../java/org/apache/ratis/RaftBasicTests.java | 94 +++++++++++++++++++- .../java/org/apache/ratis/RaftTestUtil.java | 39 +++++++- 3 files changed, 147 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42273af4/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java index 80e344c..dedfe7e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java @@ -17,11 +17,10 @@ */ package org.apache.ratis.server.storage; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.PureJavaCrc32C; import org.slf4j.Logger; @@ -68,11 +67,27 @@ public class LogOutputStream implements Closeable { fc = rp.getChannel(); fc.position(fc.size()); preallocatedPos = fc.size(); - out = new BufferedWriteChannel(fc, bufferSize); - if (!append) { - create(); + try { + fc = rp.getChannel(); + fc.position(fc.size()); + preallocatedPos = fc.size(); + + out = new BufferedWriteChannel(fc, bufferSize); + if (!append) { + create(); + } + } catch (IOException ioe) { + LOG.warn("Hit IOException while creating log segment " + file + + ", delete the partial file."); + // hit IOException, clean up the in-progress log file + try { + FileUtils.deleteFully(file); + } catch (IOException e) { + LOG.warn("Failed to delete the file " + file, e); + } + throw ioe; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42273af4/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 9875845..0cd9222 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -21,18 +21,27 @@ import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; + import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.storage.RaftStorageTestUtils; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.ExitUtils; + + import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.ratis.server.storage.RaftLog; + + +import static org.apache.ratis.RaftTestUtil.*; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; @@ -42,11 +51,10 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.ratis.RaftTestUtil.waitAndKillLeader; -import static org.apache.ratis.RaftTestUtil.waitForLeader; public abstract class RaftBasicTests extends BaseTest { { @@ -129,6 +137,84 @@ public abstract class RaftBasicTests extends BaseTest { } @Test + public void testOldLeaderCommit() throws Exception { + LOG.info("Running testOldLeaderCommit"); + final MiniRaftCluster cluster = getCluster(); + final RaftServerImpl leader = waitForLeader(cluster); + final RaftPeerId leaderId = leader.getId(); + final long term = leader.getState().getCurrentTerm(); + + List<RaftServerImpl> followers = cluster.getFollowers(); + final RaftServerImpl followerToSendLog = followers.get(0); + for (int i = 1; i < NUM_SERVERS - 1; i++) { + RaftServerImpl follower = followers.get(i); + cluster.killServer(follower.getId()); + } + + SimpleMessage[] messages = SimpleMessage.create(1); + RaftTestUtil.sendMessageInNewThread(cluster, messages); + + Thread.sleep(cluster.getMaxTimeout() + 100); + RaftLog followerLog = followerToSendLog.getState().getLog(); + assertTrue(logEntriesContains(followerLog, messages)); + + LOG.info(String.format("killing old leader: %s", leaderId.toString())); + cluster.killServer(leaderId); + + for (int i = 1; i < 3; i++) { + RaftServerImpl follower = followers.get(i); + LOG.info(String.format("restarting follower: %s", follower.getId().toString())); + cluster.restartServer(follower.getId(), false ); + } + + Thread.sleep(cluster.getMaxTimeout() * 5); + // confirm the server with log is elected as new leader. + final RaftPeerId newLeaderId = waitForLeader(cluster).getId(); + Assert.assertEquals(followerToSendLog.getId(), newLeaderId); + + cluster.getServerAliveStream() + .map(s -> s.getState().getLog()) + .forEach(log -> RaftTestUtil.assertLogEntries(log, + log.getEntries(1, 2), 1, term, messages)); + LOG.info("terminating testOldLeaderCommit test"); + } + + @Test + public void testOldLeaderNotCommit() throws Exception { + LOG.info("Running testOldLeaderNotCommit"); + final MiniRaftCluster cluster = getCluster(); + final RaftPeerId leaderId = waitForLeader(cluster).getId(); + + List<RaftServerImpl> followers = cluster.getFollowers(); + final RaftServerImpl followerToCommit = followers.get(0); + for (int i = 1; i < NUM_SERVERS - 1; i++) { + RaftServerImpl follower = followers.get(i); + cluster.killServer(follower.getId()); + } + + SimpleMessage[] messages = SimpleMessage.create(1); + sendMessageInNewThread(cluster, messages); + + Thread.sleep(cluster.getMaxTimeout() + 100); + logEntriesContains(followerToCommit.getState().getLog(), messages); + + cluster.killServer(leaderId); + cluster.killServer(followerToCommit.getId()); + + for (int i = 1; i < NUM_SERVERS - 1; i++) { + RaftServerImpl follower = followers.get(i); + cluster.restartServer(follower.getId(), false ); + } + waitForLeader(cluster); + Thread.sleep(cluster.getMaxTimeout() + 100); + + final Predicate<LogEntryProto> predicate = l -> l.getTerm() != 1; + cluster.getServerAliveStream() + .map(s -> s.getState().getLog()) + .forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate)); + } + + @Test public void testEnforceLeader() throws Exception { LOG.info("Running testEnforceLeader"); final String leader = "s" + ThreadLocalRandom.current().nextInt(NUM_SERVERS); @@ -161,7 +247,7 @@ public abstract class RaftBasicTests extends BaseTest { try(RaftClient client = getCluster().createClient()) { for (; step.get() < messages.length; ) { final RaftClientReply reply = client.send(messages[step.getAndIncrement()]); - Assert.assertTrue(reply.isSuccess()); + assertTrue(reply.isSuccess()); } } catch(Throwable t) { if (exceptionInClientThread.compareAndSet(null, t)) { @@ -244,7 +330,7 @@ public abstract class RaftBasicTests extends BaseTest { } final int n = clients.stream().mapToInt(c -> c.step.get()).sum(); - Assert.assertTrue(n >= lastStep.get()); + assertTrue(n >= lastStep.get()); if (n - lastStep.get() < 50 * numClients) { // Change leader at least 50 steps. Thread.sleep(10); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42273af4/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 927ad88..62545fe 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -17,6 +17,7 @@ */ package org.apache.ratis; +import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; @@ -44,6 +45,8 @@ import java.util.Collection; import java.util.Objects; import java.util.function.BooleanSupplier; import java.util.function.IntSupplier; +import java.util.function.Predicate; + public interface RaftTestUtil { @@ -131,6 +134,25 @@ public interface RaftTestUtil { return idxExpected == expectedMessages.length; } + static void checkLogEntries(RaftLog log, SimpleMessage[] expectedMessages, + Predicate<LogEntryProto> predicate) { + TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE); + for (int i = 0; i < termIndices.length; i++) { + for (int j = 0; j < expectedMessages.length; j++) { + final LogEntryProto e; + try { + e = log.get(termIndices[i].getIndex()); + if (Arrays.equals(expectedMessages[j].getContent().toByteArray(), + e.getSmLogEntry().getData().toByteArray())) { + Assert.assertTrue(predicate.test(e)); + } + } catch (IOException exception) { + exception.printStackTrace(); + } + } + } + } + static void assertLogEntries(Collection<RaftServerProxy> servers, SimpleMessage... expectedMessages) { final int size = servers.size(); @@ -146,7 +168,7 @@ public interface RaftTestUtil { } static void assertLogEntries(RaftLog log, TermIndex[] entries, - long startIndex, long expertedTerm, SimpleMessage... expectedMessages) { + long startIndex, long expectedTerm, SimpleMessage... expectedMessages) { Assert.assertEquals(expectedMessages.length, entries.length); for(int i = 0; i < entries.length; i++) { final LogEntryProto e; @@ -155,7 +177,7 @@ public interface RaftTestUtil { } catch (IOException exception) { throw new RuntimeException(exception); } - Assert.assertEquals(expertedTerm, e.getTerm()); + Assert.assertEquals(expectedTerm, e.getTerm()); Assert.assertEquals(startIndex + i, e.getIndex()); Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(), e.getSmLogEntry().getData().toByteArray()); @@ -298,4 +320,17 @@ public interface RaftTestUtil { Thread.sleep(3 * maxTimeout); } + + static void sendMessageInNewThread(MiniRaftCluster cluster, SimpleMessage... messages) { + RaftPeerId leaderId = cluster.getLeader().getId(); + new Thread(() -> { + try (final RaftClient client = cluster.createClient(leaderId)) { + for (SimpleMessage mssg: messages) { + client.send(mssg); + } + } catch (Exception e) { + e.printStackTrace(); + } + }).start(); + } }
