Repository: incubator-ratis Updated Branches: refs/heads/master ce783995f -> 8ef7b4852
RATIS-361. Fix the NPE bug in MemoryRaftLog. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8ef7b485 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8ef7b485 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8ef7b485 Branch: refs/heads/master Commit: 8ef7b4852163456eb5db6c5bbb8440d9c3cb36b0 Parents: ce78399 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Mon Oct 22 22:44:34 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Mon Oct 22 22:44:34 2018 +0800 ---------------------------------------------------------------------- .../ratis/server/storage/MemoryRaftLog.java | 68 +++++++++++++------- .../ratis/statemachine/TestStateMachine.java | 14 ++-- 2 files changed, 53 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ef7b485/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index 5a09aef..2661ba8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -30,12 +30,40 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; /** * A simple RaftLog implementation in memory. Used only for testing. */ public class MemoryRaftLog extends RaftLog { - private final List<LogEntryProto> entries = new ArrayList<>(); + static class EntryList { + private final List<LogEntryProto> entries = new ArrayList<>(); + + LogEntryProto get(int i) { + return i >= 0 && i < entries.size() ? entries.get(i) : null; + } + + TermIndex getTermIndex(int i) { + return ServerProtoUtils.toTermIndex(get(i)); + } + + int size() { + return entries.size(); + } + + void truncate(int index) { + if (entries.size() > index) { + entries.subList(index, entries.size()).clear(); + } + } + + void add(LogEntryProto entry) { + entries.add(entry); + } + } + + private final EntryList entries = new EntryList(); + private final AtomicReference<Metadata> metadata = new AtomicReference<>(new Metadata(null, 0)); public MemoryRaftLog(RaftPeerId selfId, int maxBufferSize) { super(selfId, maxBufferSize); @@ -45,8 +73,7 @@ public class MemoryRaftLog extends RaftLog { public LogEntryProto get(long index) { checkLogState(); try(AutoCloseableLock readLock = readLock()) { - final int i = (int) index; - return i >= 0 && i < entries.size() ? entries.get(i) : null; + return entries.get(Math.toIntExact(index)); } } @@ -59,9 +86,7 @@ public class MemoryRaftLog extends RaftLog { public TermIndex getTermIndex(long index) { checkLogState(); try(AutoCloseableLock readLock = readLock()) { - final int i = (int) index; - return i >= 0 && i < entries.size() ? - ServerProtoUtils.toTermIndex(entries.get(i)) : null; + return entries.getTermIndex(Math.toIntExact(index)); } } @@ -69,15 +94,14 @@ public class MemoryRaftLog extends RaftLog { public TermIndex[] getEntries(long startIndex, long endIndex) { checkLogState(); try(AutoCloseableLock readLock = readLock()) { - final int from = (int) startIndex; if (startIndex >= entries.size()) { return null; } - final int to = (int) Math.min(entries.size(), endIndex); + final int from = Math.toIntExact(startIndex); + final int to = Math.toIntExact(Math.min(entries.size(), endIndex)); TermIndex[] ti = new TermIndex[to - from]; for (int i = 0; i < ti.length; i++) { - ti[i] = TermIndex.newTermIndex(entries.get(i).getTerm(), - entries.get(i).getIndex()); + ti[i] = entries.getTermIndex(i); } return ti; } @@ -88,10 +112,7 @@ public class MemoryRaftLog extends RaftLog { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { Preconditions.assertTrue(index >= 0); - final int truncateIndex = (int) index; - for (int i = entries.size() - 1; i >= truncateIndex; i--) { - entries.remove(i); - } + entries.truncate(Math.toIntExact(index)); } return CompletableFuture.completedFuture(index); } @@ -100,8 +121,7 @@ public class MemoryRaftLog extends RaftLog { public TermIndex getLastEntryTermIndex() { checkLogState(); try(AutoCloseableLock readLock = readLock()) { - final int size = entries.size(); - return size == 0 ? null : ServerProtoUtils.toTermIndex(entries.get(size - 1)); + return entries.getTermIndex(entries.size() - 1); } } @@ -120,8 +140,7 @@ public class MemoryRaftLog extends RaftLog { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { final long nextIndex = getNextIndex(); - final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term, - nextIndex); + final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term, nextIndex); entries.add(e); return nextIndex; } @@ -129,17 +148,16 @@ public class MemoryRaftLog extends RaftLog { @Override public long getStartIndex() { - return entries.isEmpty() ? RaftServerConstants.INVALID_LOG_INDEX : - entries.get(0).getIndex(); + return entries.size() == 0? RaftServerConstants.INVALID_LOG_INDEX: entries.getTermIndex(0).getIndex(); } @Override public List<CompletableFuture<Long>> append(LogEntryProto... entries) { checkLogState(); + if (entries == null || entries.length == 0) { + return Collections.emptyList(); + } try(AutoCloseableLock writeLock = writeLock()) { - if (entries == null || entries.length == 0) { - return Collections.emptyList(); - } // Before truncating the entries, we first need to check if some // entries are duplicated. If the leader sends entry 6, entry 7, then // entry 6 again, without this check the follower may truncate entry 7 @@ -191,12 +209,12 @@ public class MemoryRaftLog extends RaftLog { @Override public void writeMetadata(long term, RaftPeerId votedFor) { - // do nothing + metadata.set(new Metadata(votedFor, term)); } @Override public Metadata loadMetadata() { - return new Metadata(null, 0); + return metadata.get(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ef7b485/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java index 4bb3cb6..329b02b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java @@ -116,12 +116,18 @@ public class TestStateMachine extends BaseTest implements MiniRaftClusterWithSim @Test public void testTransactionContextIsPassedBack() throws Throwable { + runTestTransactionContextIsPassedBack(false); + } + + @Test + public void testTransactionContextIsPassedBackUseMemory() throws Throwable { + runTestTransactionContextIsPassedBack(true); + } + + void runTestTransactionContextIsPassedBack(boolean useMemory) throws Throwable { final RaftProperties properties = new RaftProperties(); properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SMTransactionContext.class, StateMachine.class); - - // TODO: fix and run with in-memory log. It fails with NPE - // TODO: if change setUseMemory to true - RaftServerConfigKeys.Log.setUseMemory(properties, false); + RaftServerConfigKeys.Log.setUseMemory(properties, useMemory); try(MiniRaftClusterWithSimulatedRpc cluster = getFactory().newCluster(NUM_SERVERS, properties)) { cluster.start();
