Repository: incubator-ratis Updated Branches: refs/heads/master 3b9d50ded -> d16c7dd25
RATIS-321. RaftLog should validate appendEntries. Contributed by Lokesh Jain. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/d16c7dd2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/d16c7dd2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/d16c7dd2 Branch: refs/heads/master Commit: d16c7dd2571fab11427c0b1ab7112363befe7f92 Parents: 3b9d50d Author: Lokesh Jain <[email protected]> Authored: Mon Sep 24 11:54:33 2018 +0530 Committer: Lokesh Jain <[email protected]> Committed: Mon Sep 24 11:54:33 2018 +0530 ---------------------------------------------------------------------- .../ratis/server/storage/MemoryRaftLog.java | 1 + .../apache/ratis/server/storage/RaftLog.java | 13 ++++++ .../ratis/server/storage/SegmentedRaftLog.java | 1 + .../ratis/server/storage/TestCacheEviction.java | 4 +- .../server/storage/TestSegmentedRaftLog.java | 43 ++++++++++++++++---- 5 files changed, 51 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d16c7dd2/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index 71bed5c..07c73ec 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -110,6 +110,7 @@ public class MemoryRaftLog extends RaftLog { CompletableFuture<Long> appendEntry(LogEntryProto entry) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { + validateLogEntry(entry); entries.add(entry); } return CompletableFuture.completedFuture(entry.getIndex()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d16c7dd2/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 30d5c8f..ff38879 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -227,6 +227,19 @@ public abstract class RaftLog implements Closeable { public abstract TermIndex getLastEntryTermIndex(); /** + * Validate the term and index of entry w.r.t RaftLog + */ + public void validateLogEntry(LogEntryProto entry) { + TermIndex lastTermIndex = getLastEntryTermIndex(); + if (lastTermIndex != null) { + Preconditions.assertTrue(entry.getTerm() >= lastTermIndex.getTerm(), + "Entry term less than RaftLog's last term: %d, entry: %s", lastTermIndex.getTerm(), entry); + Preconditions.assertTrue(entry.getIndex() == lastTermIndex.getIndex() + 1, + "Difference between entry index and RaftLog's last index %d greater than 1, entry: %s", lastTermIndex.getIndex(), entry); + } + } + + /** * Truncate the log entries till the given index. The log with the given index * will also be truncated (i.e., inclusive). */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d16c7dd2/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index b7cf920..4d59bf3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -272,6 +272,7 @@ public class SegmentedRaftLog extends RaftLog { ServerProtoUtils.toLogEntryString(entry)); } try(AutoCloseableLock writeLock = writeLock()) { + validateLogEntry(entry); final LogSegment currentOpenSegment = cache.getOpenSegment(); if (currentOpenSegment == null) { cache.addOpenSegment(entry.getIndex()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d16c7dd2/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java index 124f7b8..a8f5fab 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java @@ -167,7 +167,7 @@ public class TestCacheEviction extends BaseTest { SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, -1, prop); raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, 7, 0); + List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0, maxCachedNum, 7, 0); LogEntryProto[] entries = generateEntries(slist); raftLog.append(entries).forEach(CompletableFuture::join); @@ -177,7 +177,7 @@ public class TestCacheEviction extends BaseTest { Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{21, 40, 40}); Mockito.when(state.getLastAppliedIndex()).thenReturn(35L); - slist = TestSegmentedRaftLog.prepareRanges(2, 7, 7 * maxCachedNum); + slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, maxCachedNum + 2, 7, 7 * maxCachedNum); entries = generateEntries(slist); raftLog.append(entries).forEach(CompletableFuture::join); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d16c7dd2/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 7b26733..890f31b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -127,12 +127,12 @@ public class TestSegmentedRaftLog extends BaseTest { return entryList.toArray(new LogEntryProto[entryList.size()]); } - static List<SegmentRange> prepareRanges(int number, int segmentSize, + static List<SegmentRange> prepareRanges(int startTerm, int endTerm, int segmentSize, long startIndex) { - List<SegmentRange> list = new ArrayList<>(number); - for (int i = 0; i < number; i++) { + List<SegmentRange> list = new ArrayList<>(endTerm - startTerm); + for (int i = startTerm; i < endTerm; i++) { list.add(new SegmentRange(startIndex, startIndex + segmentSize - 1, i, - i == number - 1)); + i == endTerm - 1)); startIndex += segmentSize; } return list; @@ -146,7 +146,7 @@ public class TestSegmentedRaftLog extends BaseTest { @Test public void testLoadLogSegments() throws Exception { // first generate log files - List<SegmentRange> ranges = prepareRanges(5, 100, 0); + List<SegmentRange> ranges = prepareRanges(0, 5, 100, 0); LogEntryProto[] entries = prepareLog(ranges); // create RaftLog object and load log file @@ -194,7 +194,7 @@ public class TestSegmentedRaftLog extends BaseTest { */ @Test public void testAppendEntry() throws Exception { - List<SegmentRange> ranges = prepareRanges(5, 200, 0); + List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0); List<LogEntryProto> entries = prepareLogEntries(ranges, null); try (SegmentedRaftLog raftLog = @@ -210,6 +210,31 @@ public class TestSegmentedRaftLog extends BaseTest { // check if the raft log is correct checkEntries(raftLog, entries, 0, entries.size()); } + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + TermIndex lastTermIndex = raftLog.getLastEntryTermIndex(); + IllegalStateException ex = null; + try { + // append entry fails if append entry term is lower than log's last entry term + raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0)) + .setTerm(lastTermIndex.getTerm() - 1) + .setIndex(lastTermIndex.getIndex() + 1).build()); + } catch (IllegalStateException e) { + ex = e; + } + Assert.assertTrue(ex.getMessage().contains("term less than RaftLog's last term")); + try { + // append entry fails if difference between append entry index and log's last entry index is greater than 1 + raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0)) + .setTerm(lastTermIndex.getTerm()) + .setIndex(lastTermIndex.getIndex() + 2).build()); + } catch (IllegalStateException e) { + ex = e; + } + Assert.assertTrue(ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex() + " greater than 1")); + } } /** @@ -220,7 +245,7 @@ public class TestSegmentedRaftLog extends BaseTest { RaftServerConfigKeys.Log.setPreallocatedSize(properties, SizeInBytes.valueOf("16KB")); RaftServerConfigKeys.Log.setSegmentSizeMax(properties, SizeInBytes.valueOf("128KB")); - List<SegmentRange> ranges = prepareRanges(1, 1024, 0); + List<SegmentRange> ranges = prepareRanges(0, 1, 1024, 0); final byte[] content = new byte[1024]; List<LogEntryProto> entries = prepareLogEntries(ranges, () -> new String(content)); @@ -244,7 +269,7 @@ public class TestSegmentedRaftLog extends BaseTest { @Test public void testTruncate() throws Exception { // prepare the log for truncation - List<SegmentRange> ranges = prepareRanges(5, 200, 0); + List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0); List<LogEntryProto> entries = prepareLogEntries(ranges, null); try (SegmentedRaftLog raftLog = @@ -326,7 +351,7 @@ public class TestSegmentedRaftLog extends BaseTest { @Test public void testAppendEntriesWithInconsistency() throws Exception { // prepare the log for truncation - List<SegmentRange> ranges = prepareRanges(5, 200, 0); + List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0); List<LogEntryProto> entries = prepareLogEntries(ranges, null); RaftServerImpl server = mock(RaftServerImpl.class);
