http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java new file mode 100644 index 0000000..a088578 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import java.io.IOException; + +import org.apache.log4j.Level; +import org.apache.ratis.RetryCacheTests; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.LogUtils; + +public class TestRetryCacheWithSimulatedRpc extends RetryCacheTests { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + private final MiniRaftClusterWithSimulatedRpc cluster; + + public TestRetryCacheWithSimulatedRpc() throws IOException { + cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster( + NUM_SERVERS, getProperties()); + } + + @Override + public MiniRaftClusterWithSimulatedRpc getCluster() { + return cluster; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerInformationWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerInformationWithSimulatedRpc.java b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerInformationWithSimulatedRpc.java new file mode 100644 index 0000000..f7025a5 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerInformationWithSimulatedRpc.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.server.impl.ServerInformationBaseTest; + +public class TestServerInformationWithSimulatedRpc + extends ServerInformationBaseTest<MiniRaftClusterWithSimulatedRpc> + implements MiniRaftClusterWithSimulatedRpc.FactoryGet { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java new file mode 100644 index 0000000..306e5e7 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.server.ServerRestartTests; + +public class TestServerRestartWithSimulatedRpc + extends ServerRestartTests<MiniRaftClusterWithSimulatedRpc> + implements MiniRaftClusterWithSimulatedRpc.FactoryGet { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java new file mode 100644 index 0000000..1cd41a5 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.server.impl.ServerState; +import org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault; +import org.apache.ratis.server.storage.TestSegmentedRaftLog.SegmentRange; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.SizeInBytes; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class TestCacheEviction extends BaseTest { + private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault(); + + private List<LogSegment> prepareSegments(int numSegments, boolean[] cached, long start, long size) { + Assert.assertEquals(numSegments, cached.length); + List<LogSegment> segments = new ArrayList<>(numSegments); + for (int i = 0; i < numSegments; i++) { + LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1); + if (cached[i]) { + s = Mockito.spy(s); + Mockito.when(s.hasCache()).thenReturn(true); + } + segments.add(s); + start += size; + } + return segments; + } + + @Test + public void testBasicEviction() throws Exception { + final int maxCached = 5; + List<LogSegment> segments = prepareSegments(5, + new boolean[]{true, true, true, true, true}, 0, 10); + + // case 1, make sure we do not evict cache for segments behind local flushed index + List<LogSegment> evicted = policy.evict(null, 5, 15, segments, maxCached); + Assert.assertEquals(0, evicted.size()); + + // case 2, suppose the local flushed index is in the 3rd segment, then we + // can evict the first two segment + evicted = policy.evict(null, 25, 30, segments, maxCached); + Assert.assertEquals(2, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + Assert.assertSame(evicted.get(1), segments.get(1)); + + // case 3, similar with case 2, but the local applied index is less than + // the local flushed index. + evicted = policy.evict(null, 25, 15, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + + // case 4, the local applied index is very small, then evict cache behind it + // first and let the state machine load the segments later + evicted = policy.evict(null, 35, 5, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(2)); + + Mockito.when(segments.get(2).hasCache()).thenReturn(false); + evicted = policy.evict(null, 35, 5, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(1)); + + Mockito.when(segments.get(1).hasCache()).thenReturn(false); + evicted = policy.evict(null, 35, 5, segments, maxCached); + Assert.assertEquals(0, evicted.size()); + } + + @Test + public void testEvictionWithFollowerIndices() throws Exception { + final int maxCached = 6; + List<LogSegment> segments = prepareSegments(6, + new boolean[]{true, true, true, true, true, true}, 0, 10); + + // case 1, no matter where the followers are, we do not evict segments behind local + // flushed index + List<LogSegment> evicted = policy.evict(new long[]{20, 40, 40}, 5, 15, segments, + maxCached); + Assert.assertEquals(0, evicted.size()); + + // case 2, the follower indices are behind the local flushed index + evicted = policy.evict(new long[]{30, 40, 45}, 25, 30, segments, maxCached); + Assert.assertEquals(2, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + Assert.assertSame(evicted.get(1), segments.get(1)); + + // case 3, similar with case 3 in basic eviction test + evicted = policy.evict(new long[]{30, 40, 45}, 25, 15, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + + // case 4, the followers are slower than local flush + evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(0)); + + Mockito.when(segments.get(0).hasCache()).thenReturn(false); + evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(2)); + + Mockito.when(segments.get(2).hasCache()).thenReturn(false); + evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached); + Assert.assertEquals(1, evicted.size()); + Assert.assertSame(evicted.get(0), segments.get(3)); + + Mockito.when(segments.get(3).hasCache()).thenReturn(false); + evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, maxCached); + Assert.assertEquals(0, evicted.size()); + } + + @Test + public void testEvictionInSegmentedLog() throws Exception { + final RaftProperties prop = new RaftProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB")); + RaftServerConfigKeys.Log.setPreallocatedSize(prop, SizeInBytes.valueOf("8KB")); + final RaftPeerId peerId = RaftPeerId.valueOf("s0"); + final int maxCachedNum = RaftServerConfigKeys.Log.maxCachedSegmentNum(prop); + + File storageDir = getTestDir(); + RaftServerConfigKeys.setStorageDirs(prop, Collections.singletonList(storageDir)); + RaftStorage storage = new RaftStorage(storageDir, RaftServerConstants.StartupOption.REGULAR); + + RaftServerImpl server = Mockito.mock(RaftServerImpl.class); + ServerState state = Mockito.mock(ServerState.class); + Mockito.when(server.getState()).thenReturn(state); + Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{}); + Mockito.when(state.getLastAppliedIndex()).thenReturn(0L); + + SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, -1, prop); + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0, maxCachedNum, 7, 0); + LogEntryProto[] entries = generateEntries(slist); + raftLog.append(entries).forEach(CompletableFuture::join); + + // check the current cached segment number: the last segment is still open + Assert.assertEquals(maxCachedNum - 1, + raftLog.getRaftLogCache().getCachedSegmentNum()); + + Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{21, 40, 40}); + Mockito.when(state.getLastAppliedIndex()).thenReturn(35L); + slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, maxCachedNum + 2, 7, 7 * maxCachedNum); + entries = generateEntries(slist); + raftLog.append(entries).forEach(CompletableFuture::join); + + // check the cached segment number again. since the slowest follower is on + // index 21, the eviction should happen and evict 3 segments + Assert.assertEquals(maxCachedNum + 1 - 3, + raftLog.getRaftLogCache().getCachedSegmentNum()); + } + + private LogEntryProto[] generateEntries(List<SegmentRange> slist) { + List<LogEntryProto> eList = new ArrayList<>(); + for (SegmentRange range : slist) { + for (long index = range.start; index <= range.end; index++) { + SimpleOperation m = new SimpleOperation(new String(new byte[1024])); + eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, index)); + } + } + return eList.toArray(new LogEntryProto[eList.size()]); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java new file mode 100644 index 0000000..d3c216d --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestRaftLogCache { + private static final RaftProperties prop = new RaftProperties(); + + private RaftLogCache cache; + + @Before + public void setup() { + cache = new RaftLogCache(null, null, prop); + } + + private LogSegment prepareLogSegment(long start, long end, boolean isOpen) { + LogSegment s = LogSegment.newOpenSegment(null, start); + for (long i = start; i <= end; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + s.appendToOpenSegment(entry); + } + if (!isOpen) { + s.close(); + } + return s; + } + + private void checkCache(long start, long end, int segmentSize) throws IOException { + Assert.assertEquals(start, cache.getStartIndex()); + Assert.assertEquals(end, cache.getEndIndex()); + + for (long index = start; index <= end; index++) { + LogEntryProto entry = cache.getSegment(index).getEntryWithoutLoading(index).getEntry(); + Assert.assertEquals(index, entry.getIndex()); + } + + long[] offsets = new long[]{start, start + 1, start + (end - start) / 2, + end - 1, end}; + for (long offset : offsets) { + checkCacheEntries(offset, (int) (end - offset + 1), end); + checkCacheEntries(offset, 1, end); + checkCacheEntries(offset, 20, end); + checkCacheEntries(offset, segmentSize, end); + checkCacheEntries(offset, segmentSize - 1, end); + } + } + + private void checkCacheEntries(long offset, int size, long end) { + TermIndex[] entries = cache.getTermIndices(offset, offset + size); + long realEnd = offset + size > end + 1 ? end + 1 : offset + size; + Assert.assertEquals(realEnd - offset, entries.length); + for (long i = offset; i < realEnd; i++) { + Assert.assertEquals(i, entries[(int) (i - offset)].getIndex()); + } + } + + @Test + public void testAddSegments() throws Exception { + LogSegment s1 = prepareLogSegment(1, 100, false); + cache.addSegment(s1); + checkCache(1, 100, 100); + + try { + LogSegment s = prepareLogSegment(102, 103, true); + cache.addSegment(s); + Assert.fail("should fail since there is gap between two segments"); + } catch (IllegalStateException ignored) { + } + + LogSegment s2 = prepareLogSegment(101, 200, true); + cache.addSegment(s2); + checkCache(1, 200, 100); + + try { + LogSegment s = prepareLogSegment(201, 202, true); + cache.addSegment(s); + Assert.fail("should fail since there is still an open segment in cache"); + } catch (IllegalStateException ignored) { + } + + cache.rollOpenSegment(false); + checkCache(1, 200, 100); + + try { + LogSegment s = prepareLogSegment(202, 203, true); + cache.addSegment(s); + Assert.fail("should fail since there is gap between two segments"); + } catch (IllegalStateException ignored) { + } + + LogSegment s3 = prepareLogSegment(201, 300, true); + cache.addSegment(s3); + Assert.assertNotNull(cache.getOpenSegment()); + checkCache(1, 300, 100); + + cache.rollOpenSegment(true); + Assert.assertNotNull(cache.getOpenSegment()); + checkCache(1, 300, 100); + } + + @Test + public void testAppendEntry() throws Exception { + LogSegment closedSegment = prepareLogSegment(0, 99, false); + cache.addSegment(closedSegment); + + final SimpleOperation m = new SimpleOperation("m"); + try { + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 0); + cache.appendEntry(entry); + Assert.fail("the open segment is null"); + } catch (IllegalStateException ignored) { + } + + LogSegment openSegment = prepareLogSegment(100, 100, true); + cache.addSegment(openSegment); + for (long index = 101; index < 200; index++) { + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, index); + cache.appendEntry(entry); + } + + Assert.assertNotNull(cache.getOpenSegment()); + checkCache(0, 199, 100); + } + + @Test + public void testTruncate() throws Exception { + long start = 0; + for (int i = 0; i < 5; i++) { // 5 closed segments + LogSegment s = prepareLogSegment(start, start + 99, false); + cache.addSegment(s); + start += 100; + } + // add another open segment + LogSegment s = prepareLogSegment(start, start + 99, true); + cache.addSegment(s); + + long end = cache.getEndIndex(); + Assert.assertEquals(599, end); + int numOfSegments = 6; + // start truncation + for (int i = 0; i < 10; i++) { // truncate 10 times + // each time truncate 37 entries + end -= 37; + TruncationSegments ts = cache.truncate(end + 1); + checkCache(0, end, 100); + + // check TruncationSegments + int currentNum= (int) (end / 100 + 1); + if (currentNum < numOfSegments) { + Assert.assertEquals(1, ts.toDelete.length); + numOfSegments = currentNum; + } else { + Assert.assertEquals(0, ts.toDelete.length); + } + } + + // 230 entries remaining. truncate at the segment boundary + TruncationSegments ts = cache.truncate(200); + checkCache(0, 199, 100); + Assert.assertEquals(1, ts.toDelete.length); + Assert.assertEquals(200, ts.toDelete[0].startIndex); + Assert.assertEquals(229, ts.toDelete[0].endIndex); + Assert.assertEquals(0, ts.toDelete[0].targetLength); + Assert.assertFalse(ts.toDelete[0].isOpen); + Assert.assertNull(ts.toTruncate); + + // add another open segment and truncate it as a whole + LogSegment newOpen = prepareLogSegment(200, 249, true); + cache.addSegment(newOpen); + ts = cache.truncate(200); + checkCache(0, 199, 100); + Assert.assertEquals(1, ts.toDelete.length); + Assert.assertEquals(200, ts.toDelete[0].startIndex); + Assert.assertEquals(249, ts.toDelete[0].endIndex); + Assert.assertEquals(0, ts.toDelete[0].targetLength); + Assert.assertTrue(ts.toDelete[0].isOpen); + Assert.assertNull(ts.toTruncate); + + // add another open segment and truncate part of it + newOpen = prepareLogSegment(200, 249, true); + cache.addSegment(newOpen); + ts = cache.truncate(220); + checkCache(0, 219, 100); + Assert.assertNull(cache.getOpenSegment()); + Assert.assertEquals(0, ts.toDelete.length); + Assert.assertTrue(ts.toTruncate.isOpen); + Assert.assertEquals(219, ts.toTruncate.newEndIndex); + Assert.assertEquals(200, ts.toTruncate.startIndex); + Assert.assertEquals(249, ts.toTruncate.endIndex); + } + + private void testIterator(long startIndex) throws IOException { + Iterator<TermIndex> iterator = cache.iterator(startIndex); + TermIndex prev = null; + while (iterator.hasNext()) { + TermIndex termIndex = iterator.next(); + Assert.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex); + if (prev != null) { + Assert.assertEquals(prev.getIndex() + 1, termIndex.getIndex()); + } + prev = termIndex; + } + if (startIndex <= cache.getEndIndex()) { + Assert.assertNotNull(prev); + Assert.assertEquals(cache.getEndIndex(), prev.getIndex()); + } + } + + @Test + public void testIterator() throws Exception { + long start = 0; + for (int i = 0; i < 2; i++) { // 2 closed segments + LogSegment s = prepareLogSegment(start, start + 99, false); + cache.addSegment(s); + start += 100; + } + // add another open segment + LogSegment s = prepareLogSegment(start, start + 99, true); + cache.addSegment(s); + + for (long startIndex = 0; startIndex < 300; startIndex += 50) { + testIterator(startIndex); + } + testIterator(299); + + Iterator<TermIndex> iterator = cache.iterator(300); + Assert.assertFalse(iterator.hasNext()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java new file mode 100644 index 0000000..7d9fdf5 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ChecksumException; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.util.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Test basic functionality of LogReader, LogInputStream, and LogOutputStream. + */ +public class TestRaftLogReadWrite extends BaseTest { + private File storageDir; + private long segmentMaxSize; + private long preallocatedSize; + private int bufferSize; + + @Before + public void setup() throws Exception { + storageDir = getTestDir(); + RaftProperties properties = new RaftProperties(); + RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(storageDir)); + this.segmentMaxSize = + RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + this.preallocatedSize = + RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + this.bufferSize = + RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); + } + + @After + public void tearDown() throws Exception { + if (storageDir != null) { + FileUtils.deleteFully(storageDir.getParentFile()); + } + } + + private LogEntryProto[] readLog(File file, long startIndex, long endIndex, + boolean isOpen) throws IOException { + List<LogEntryProto> list = new ArrayList<>(); + try (LogInputStream in = + new LogInputStream(file, startIndex, endIndex, isOpen)) { + LogEntryProto entry; + while ((entry = in.nextEntry()) != null) { + list.add(entry); + } + } + return list.toArray(new LogEntryProto[list.size()]); + } + + private long writeMessages(LogEntryProto[] entries, LogOutputStream out) + throws IOException { + long size = 0; + for (int i = 0; i < entries.length; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + final int s = entries[i].getSerializedSize(); + size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4; + out.write(entries[i]); + } + return size; + } + + /** + * Test basic functionality: write several log entries, then read + */ + @Test + public void testReadWriteLog() throws IOException { + final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + long size = SegmentedRaftLogFormat.getHeaderLength(); + + final LogEntryProto[] entries = new LogEntryProto[100]; + try (LogOutputStream out = + new LogOutputStream(openSegment, false, segmentMaxSize, + preallocatedSize, bufferSize)) { + size += writeMessages(entries, out); + } finally { + storage.close(); + } + + Assert.assertEquals(size, openSegment.length()); + + LogEntryProto[] readEntries = readLog(openSegment, 0, + RaftServerConstants.INVALID_LOG_INDEX, true); + Assert.assertArrayEquals(entries, readEntries); + } + + @Test + public void testAppendLog() throws IOException { + final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + LogEntryProto[] entries = new LogEntryProto[200]; + try (LogOutputStream out = + new LogOutputStream(openSegment, false, segmentMaxSize, + preallocatedSize, bufferSize)) { + for (int i = 0; i < 100; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + out.write(entries[i]); + } + } + + try (LogOutputStream out = + new LogOutputStream(openSegment, true, segmentMaxSize, + preallocatedSize, bufferSize)) { + for (int i = 100; i < 200; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + out.write(entries[i]); + } + } + + LogEntryProto[] readEntries = readLog(openSegment, 0, + RaftServerConstants.INVALID_LOG_INDEX, true); + Assert.assertArrayEquals(entries, readEntries); + + storage.close(); + } + + /** + * Simulate the scenario that the peer is shutdown without truncating + * log segment file padding. Make sure the reader can correctly handle this. + */ + @Test + public void testReadWithPadding() throws IOException { + final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + long size = SegmentedRaftLogFormat.getHeaderLength(); + + LogEntryProto[] entries = new LogEntryProto[100]; + LogOutputStream out = new LogOutputStream(openSegment, false, + segmentMaxSize, preallocatedSize, bufferSize); + size += writeMessages(entries, out); + out.flush(); + + // make sure the file contains padding + Assert.assertEquals( + RaftServerConfigKeys.Log.PREALLOCATED_SIZE_DEFAULT.getSize(), + openSegment.length()); + + // check if the reader can correctly read the log file + LogEntryProto[] readEntries = readLog(openSegment, 0, + RaftServerConstants.INVALID_LOG_INDEX, true); + Assert.assertArrayEquals(entries, readEntries); + + out.close(); + Assert.assertEquals(size, openSegment.length()); + } + + /** + * corrupt the padding by inserting non-zero bytes. Make sure the reader + * throws exception. + */ + @Test + public void testReadWithCorruptPadding() throws IOException { + final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + + LogEntryProto[] entries = new LogEntryProto[10]; + LogOutputStream out = new LogOutputStream(openSegment, false, + 16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize); + for (int i = 0; i < 10; i++) { + SimpleOperation m = new SimpleOperation("m" + i); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); + out.write(entries[i]); + } + out.flush(); + + // make sure the file contains padding + Assert.assertEquals(4 * 1024 * 1024, openSegment.length()); + + try (FileOutputStream fout = new FileOutputStream(openSegment, true)) { + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{-1, 1}); + fout.getChannel() + .write(byteBuffer, 16 * 1024 * 1024 - 10); + } + + List<LogEntryProto> list = new ArrayList<>(); + try (LogInputStream in = new LogInputStream(openSegment, 0, + RaftServerConstants.INVALID_LOG_INDEX, true)) { + LogEntryProto entry; + while ((entry = in.nextEntry()) != null) { + list.add(entry); + } + Assert.fail("should fail since we corrupt the padding"); + } catch (IOException e) { + boolean findVerifyTerminator = false; + for (StackTraceElement s : e.getStackTrace()) { + if (s.getMethodName().equals("verifyTerminator")) { + findVerifyTerminator = true; + break; + } + } + Assert.assertTrue(findVerifyTerminator); + } + Assert.assertArrayEquals(entries, + list.toArray(new LogEntryProto[list.size()])); + } + + /** + * Test the log reader to make sure it can detect the checksum mismatch. + */ + @Test + public void testReadWithEntryCorruption() throws IOException { + RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + File openSegment = storage.getStorageDir().getOpenLogFile(0); + try (LogOutputStream out = + new LogOutputStream(openSegment, false, segmentMaxSize, + preallocatedSize, bufferSize)) { + for (int i = 0; i < 100; i++) { + LogEntryProto entry = ServerProtoUtils.toLogEntryProto( + new SimpleOperation("m" + i).getLogEntryContent(), 0, i); + out.write(entry); + } + } finally { + storage.close(); + } + + // corrupt the log file + try (RandomAccessFile raf = new RandomAccessFile(openSegment.getCanonicalFile(), + "rw")) { + raf.seek(100); + int correctValue = raf.read(); + raf.seek(100); + raf.write(correctValue + 1); + } + + try { + readLog(openSegment, 0, RaftServerConstants.INVALID_LOG_INDEX, true); + Assert.fail("The read of corrupted log file should fail"); + } catch (ChecksumException e) { + LOG.info("Caught ChecksumException as expected", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java new file mode 100644 index 0000000..270e279 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; +import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TraditionalBinaryPrefix; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; +import static org.apache.ratis.server.storage.LogSegment.getEntrySize; + +/** + * Test basic functionality of {@link LogSegment} + */ +public class TestRaftLogSegment extends BaseTest { + private File storageDir; + private long segmentMaxSize; + private long preallocatedSize; + private int bufferSize; + + @Before + public void setup() throws Exception { + RaftProperties properties = new RaftProperties(); + storageDir = getTestDir(); + RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(storageDir)); + this.segmentMaxSize = + RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + this.preallocatedSize = + RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + this.bufferSize = + RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); + } + + @After + public void tearDown() throws Exception { + if (storageDir != null) { + FileUtils.deleteFully(storageDir.getParentFile()); + } + } + + File prepareLog(boolean isOpen, long startIndex, int numEntries, long term, boolean isLastEntryPartiallyWritten) + throws IOException { + if (!isOpen) { + Preconditions.assertTrue(!isLastEntryPartiallyWritten, "For closed log, the last entry cannot be partially written."); + } + RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + final File file = isOpen ? + storage.getStorageDir().getOpenLogFile(startIndex) : + storage.getStorageDir().getClosedLogFile(startIndex, startIndex + numEntries - 1); + + final LogEntryProto[] entries = new LogEntryProto[numEntries]; + try (LogOutputStream out = new LogOutputStream(file, false, + segmentMaxSize, preallocatedSize, bufferSize)) { + for (int i = 0; i < entries.length; i++) { + SimpleOperation op = new SimpleOperation("m" + i); + entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i + startIndex); + out.write(entries[i]); + } + } + + if (isLastEntryPartiallyWritten) { + final int entrySize = size(entries[entries.length - 1]); + final int truncatedEntrySize = ThreadLocalRandom.current().nextInt(entrySize - 1) + 1; + // 0 < truncatedEntrySize < entrySize + final long fileLength = file.length(); + final long truncatedFileLength = fileLength - (entrySize - truncatedEntrySize); + LOG.info("truncate last entry: entry(size={}, truncated={}), file(length={}, truncated={})", + entrySize, truncatedEntrySize, fileLength, truncatedFileLength); + FileUtils.truncateFile(file, truncatedFileLength); + } + + storage.close(); + return file; + } + + static int size(LogEntryProto entry) { + final int n = entry.getSerializedSize(); + return CodedOutputStream.computeUInt32SizeNoTag(n) + n + 4; + } + + static void checkLogSegment(LogSegment segment, long start, long end, + boolean isOpen, long totalSize, long term) throws Exception { + Assert.assertEquals(start, segment.getStartIndex()); + Assert.assertEquals(end, segment.getEndIndex()); + Assert.assertEquals(isOpen, segment.isOpen()); + Assert.assertEquals(totalSize, segment.getTotalSize()); + + long offset = SegmentedRaftLogFormat.getHeaderLength(); + for (long i = start; i <= end; i++) { + LogSegment.LogRecord record = segment.getLogRecord(i); + LogRecordWithEntry lre = segment.getEntryWithoutLoading(i); + Assert.assertEquals(i, lre.getRecord().getTermIndex().getIndex()); + Assert.assertEquals(term, lre.getRecord().getTermIndex().getTerm()); + Assert.assertEquals(offset, record.getOffset()); + + LogEntryProto entry = lre.hasEntry() ? + lre.getEntry() : segment.loadCache(lre.getRecord()); + offset += getEntrySize(entry); + } + } + + @Test + public void testLoadLogSegment() throws Exception { + testLoadSegment(true, false); + } + + @Test + public void testLoadLogSegmentLastEntryPartiallyWritten() throws Exception { + testLoadSegment(true, true); + } + + @Test + public void testLoadCache() throws Exception { + testLoadSegment(false, false); + } + + @Test + public void testLoadCacheLastEntryPartiallyWritten() throws Exception { + testLoadSegment(false, true); + } + + private void testLoadSegment(boolean loadInitial, boolean isLastEntryPartiallyWritten) throws Exception { + // load an open segment + final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten); + RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0, + INVALID_LOG_INDEX, true, loadInitial, null); + final int delta = isLastEntryPartiallyWritten? 1: 0; + checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0); + storage.close(); + // for open segment we currently always keep log entries in the memory + Assert.assertEquals(0, openSegment.getLoadingTimes()); + + // load a closed segment (1000-1099) + final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false); + LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile, + 1000, 1099, false, loadInitial, null); + checkLogSegment(closedSegment, 1000, 1099, false, + closedSegment.getTotalSize(), 1); + Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes()); + } + + @Test + public void testAppendEntries() throws Exception { + final long start = 1000; + LogSegment segment = LogSegment.newOpenSegment(null, start); + long size = SegmentedRaftLogFormat.getHeaderLength(); + final long max = 8 * 1024 * 1024; + checkLogSegment(segment, start, start - 1, true, size, 0); + + // append till full + long term = 0; + int i = 0; + List<LogEntryProto> list = new ArrayList<>(); + while (size < max) { + SimpleOperation op = new SimpleOperation("m" + i); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start); + size += getEntrySize(entry); + list.add(entry); + } + + segment.appendToOpenSegment(list.toArray(new LogEntryProto[list.size()])); + Assert.assertTrue(segment.getTotalSize() >= max); + checkLogSegment(segment, start, i - 1 + start, true, size, term); + } + + @Test + public void testAppendWithGap() throws Exception { + LogSegment segment = LogSegment.newOpenSegment(null, 1000); + SimpleOperation op = new SimpleOperation("m"); + final StateMachineLogEntryProto m = op.getLogEntryContent(); + try { + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1001); + segment.appendToOpenSegment(entry); + Assert.fail("should fail since the entry's index needs to be 1000"); + } catch (IllegalStateException e) { + // the exception is expected. + } + + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1000); + segment.appendToOpenSegment(entry); + + try { + entry = ServerProtoUtils.toLogEntryProto(m, 0, 1002); + segment.appendToOpenSegment(entry); + Assert.fail("should fail since the entry's index needs to be 1001"); + } catch (IllegalStateException e) { + // the exception is expected. + } + + LogEntryProto[] entries = new LogEntryProto[2]; + for (int i = 0; i < 2; i++) { + entries[i] = ServerProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2); + } + try { + segment.appendToOpenSegment(entries); + Assert.fail("should fail since there is gap between entries"); + } catch (IllegalStateException e) { + // the exception is expected. + } + } + + @Test + public void testTruncate() throws Exception { + final long term = 1; + final long start = 1000; + LogSegment segment = LogSegment.newOpenSegment(null, start); + for (int i = 0; i < 100; i++) { + LogEntryProto entry = ServerProtoUtils.toLogEntryProto( + new SimpleOperation("m" + i).getLogEntryContent(), term, i + start); + segment.appendToOpenSegment(entry); + } + + // truncate an open segment (remove 1080~1099) + long newSize = segment.getLogRecord(start + 80).getOffset(); + segment.truncate(start + 80); + Assert.assertEquals(80, segment.numOfEntries()); + checkLogSegment(segment, start, start + 79, false, newSize, term); + + // truncate a closed segment (remove 1050~1079) + newSize = segment.getLogRecord(start + 50).getOffset(); + segment.truncate(start + 50); + Assert.assertEquals(50, segment.numOfEntries()); + checkLogSegment(segment, start, start + 49, false, newSize, term); + + // truncate all the remaining entries + segment.truncate(start); + Assert.assertEquals(0, segment.numOfEntries()); + checkLogSegment(segment, start, start - 1, false, + SegmentedRaftLogFormat.getHeaderLength(), term); + } + + @Test + public void testPreallocateSegment() throws Exception { + RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + final File file = storage.getStorageDir().getOpenLogFile(0); + final int[] maxSizes = new int[]{1024, 1025, 1024 * 1024 - 1, 1024 * 1024, + 1024 * 1024 + 1, 2 * 1024 * 1024 - 1, 2 * 1024 * 1024, + 2 * 1024 * 1024 + 1, 8 * 1024 * 1024}; + final int[] preallocated = new int[]{512, 1024, 1025, 1024 * 1024, + 1024 * 1024 + 1, 2 * 1024 * 1024}; + + // make sure preallocation is correct with different max/pre-allocated size + for (int max : maxSizes) { + for (int a : preallocated) { + try (LogOutputStream ignored = + new LogOutputStream(file, false, max, a, bufferSize)) { + Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a)); + } + try (LogInputStream in = + new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) { + LogEntryProto entry = in.nextEntry(); + Assert.assertNull(entry); + } + } + } + + // test the scenario where an entry's size is larger than the max size + final byte[] content = new byte[1024 * 2]; + Arrays.fill(content, (byte) 1); + final long size; + try (LogOutputStream out = new LogOutputStream(file, false, + 1024, 1024, bufferSize)) { + SimpleOperation op = new SimpleOperation(new String(content)); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0); + size = LogSegment.getEntrySize(entry); + out.write(entry); + } + Assert.assertEquals(file.length(), + size + SegmentedRaftLogFormat.getHeaderLength()); + try (LogInputStream in = new LogInputStream(file, 0, + INVALID_LOG_INDEX, true)) { + LogEntryProto entry = in.nextEntry(); + Assert.assertArrayEquals(content, + entry.getStateMachineLogEntry().getLogData().toByteArray()); + Assert.assertNull(in.nextEntry()); + } + } + + /** + * Keep appending and check if pre-allocation is correct + */ + @Test + public void testPreallocationAndAppend() throws Exception { + final SizeInBytes max = SizeInBytes.valueOf(2, TraditionalBinaryPrefix.MEGA); + RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + final File file = storage.getStorageDir().getOpenLogFile(0); + + final byte[] content = new byte[1024]; + Arrays.fill(content, (byte) 1); + SimpleOperation op = new SimpleOperation(new String(content)); + LogEntryProto entry = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0); + final long entrySize = LogSegment.getEntrySize(entry); + + long totalSize = SegmentedRaftLogFormat.getHeaderLength(); + long preallocated = 16 * 1024; + try (LogOutputStream out = new LogOutputStream(file, false, + max.getSize(), 16 * 1024, 10 * 1024)) { + Assert.assertEquals(preallocated, file.length()); + while (totalSize + entrySize < max.getSize()) { + totalSize += entrySize; + out.write(entry); + if (totalSize > preallocated) { + Assert.assertEquals("totalSize==" + totalSize, + preallocated + 16 * 1024, file.length()); + preallocated += 16 * 1024; + } + } + } + + Assert.assertEquals(totalSize, file.length()); + } + + @Test + public void testZeroSizeInProgressFile() throws Exception { + final RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + final File file = storage.getStorageDir().getOpenLogFile(0); + storage.close(); + + // create zero size in-progress file + LOG.info("file: " + file); + Assert.assertTrue(file.createNewFile()); + final Path path = file.toPath(); + Assert.assertTrue(Files.exists(path)); + Assert.assertEquals(0, Files.size(path)); + + // getLogSegmentFiles should remove it. + final List<RaftStorageDirectory.LogPathAndIndex> logs = storage.getStorageDir().getLogSegmentFiles(); + Assert.assertEquals(0, logs.size()); + Assert.assertFalse(Files.exists(path)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java new file mode 100644 index 0000000..4a26f8c --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.server.impl.RaftServerConstants.StartupOption; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.util.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Test RaftStorage and RaftStorageDirectory + */ +public class TestRaftStorage extends BaseTest { + private File storageDir; + + @Before + public void setup() throws Exception { + storageDir = getTestDir(); + } + + @After + public void tearDown() throws Exception { + if (storageDir != null) { + FileUtils.deleteFully(storageDir.getParentFile()); + } + } + + @Test + public void testNotExistent() throws IOException { + FileUtils.deleteFully(storageDir); + + // we will format the empty directory + RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + + try { + new RaftStorage(storageDir, StartupOption.FORMAT).close(); + Assert.fail("the format should fail since the storage is still locked"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("directory is already locked")); + } + + storage.close(); + FileUtils.deleteFully(storageDir); + Assert.assertTrue(storageDir.createNewFile()); + try { + new RaftStorage(storageDir, StartupOption.REGULAR); + Assert.fail(); + } catch (IOException e) { + Assert.assertTrue( + e.getMessage().contains(StorageState.NON_EXISTENT.name())); + } + } + + /** + * make sure the RaftStorage format works + */ + @Test + public void testStorage() throws Exception { + RaftStorageDirectory sd = new RaftStorageDirectory(storageDir); + try { + StorageState state = sd.analyzeStorage(true); + Assert.assertEquals(StorageState.NOT_FORMATTED, state); + Assert.assertTrue(sd.isCurrentEmpty()); + } finally { + sd.unlock(); + } + + RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + storage.close(); + + Assert.assertEquals(StorageState.NORMAL, sd.analyzeStorage(false)); + File m = sd.getMetaFile(); + Assert.assertTrue(m.exists()); + MetaFile metaFile = new MetaFile(m); + Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); + Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); + + metaFile.set(123, "peer1"); + metaFile.readFile(); + Assert.assertEquals(123, metaFile.getTerm()); + Assert.assertEquals("peer1", metaFile.getVotedFor()); + + MetaFile metaFile2 = new MetaFile(m); + Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded")); + Assert.assertEquals(123, metaFile.getTerm()); + Assert.assertEquals("peer1", metaFile.getVotedFor()); + + // test format + storage = new RaftStorage(storageDir, StartupOption.FORMAT); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + metaFile = new MetaFile(sd.getMetaFile()); + Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); + Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); + storage.close(); + } + + @Test + public void testMetaFile() throws Exception { + RaftStorage storage = new RaftStorage(storageDir, StartupOption.FORMAT); + File m = storage.getStorageDir().getMetaFile(); + Assert.assertTrue(m.exists()); + MetaFile metaFile = new MetaFile(m); + Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm()); + Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor()); + + metaFile.set(123, "peer1"); + metaFile.readFile(); + Assert.assertEquals(123, metaFile.getTerm()); + Assert.assertEquals("peer1", metaFile.getVotedFor()); + + MetaFile metaFile2 = new MetaFile(m); + Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded")); + Assert.assertEquals(123, metaFile.getTerm()); + Assert.assertEquals("peer1", metaFile.getVotedFor()); + + storage.close(); + } + + /** + * check if RaftStorage deletes tmp metafile when startup + */ + @Test + public void testCleanMetaTmpFile() throws Exception { + RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + storage.close(); + + RaftStorageDirectory sd = new RaftStorageDirectory(storageDir); + File metaFile = sd.getMetaFile(); + FileUtils.move(metaFile, sd.getMetaTmpFile()); + + Assert.assertEquals(StorageState.NOT_FORMATTED, sd.analyzeStorage(false)); + + try { + new RaftStorage(storageDir, StartupOption.REGULAR); + Assert.fail("should throw IOException since storage dir is not formatted"); + } catch (IOException e) { + Assert.assertTrue( + e.getMessage().contains(StorageState.NOT_FORMATTED.name())); + } + + // let the storage dir contain both raft-meta and raft-meta.tmp + new RaftStorage(storageDir, StartupOption.FORMAT).close(); + Assert.assertTrue(sd.getMetaFile().exists()); + Assert.assertTrue(sd.getMetaTmpFile().createNewFile()); + Assert.assertTrue(sd.getMetaTmpFile().exists()); + try { + storage = new RaftStorage(storageDir, StartupOption.REGULAR); + Assert.assertEquals(StorageState.NORMAL, storage.getState()); + Assert.assertFalse(sd.getMetaTmpFile().exists()); + Assert.assertTrue(sd.getMetaFile().exists()); + } finally { + storage.close(); + } + } + + @Test + public void testSnapshotFileName() throws Exception { + final long term = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + final long index = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + final String name = SimpleStateMachineStorage.getSnapshotFileName(term, index); + System.out.println("name = " + name); + final File file = new File(storageDir, name); + final TermIndex ti = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(file); + System.out.println("file = " + file); + Assert.assertEquals(term, ti.getTerm()); + Assert.assertEquals(index, ti.getIndex()); + System.out.println("ti = " + ti); + + final File foo = new File(storageDir, "foo"); + try { + SimpleStateMachineStorage.getTermIndexFromSnapshotFile(foo); + Assert.fail(); + } catch(IllegalArgumentException iae) { + System.out.println("Good " + iae); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java new file mode 100644 index 0000000..bcbfa73 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -0,0 +1,524 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.log4j.Level; +import org.apache.ratis.BaseTest; +import org.apache.ratis.RaftTestUtil.SimpleOperation; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.TimeoutIOException; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.impl.RetryCacheTestUtil; +import org.apache.ratis.server.impl.RetryCache; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestSegmentedRaftLog extends BaseTest { + static { + LogUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG); + } + + private static final RaftPeerId peerId = RaftPeerId.valueOf("s0"); + + static class SegmentRange { + final long start; + final long end; + final long term; + final boolean isOpen; + + SegmentRange(long s, long e, long term, boolean isOpen) { + this.start = s; + this.end = e; + this.term = term; + this.isOpen = isOpen; + } + } + + private File storageDir; + private RaftProperties properties; + private RaftStorage storage; + private long segmentMaxSize; + private long preallocatedSize; + private int bufferSize; + + @Before + public void setup() throws Exception { + storageDir = getTestDir(); + properties = new RaftProperties(); + RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(storageDir)); + storage = new RaftStorage(storageDir, RaftServerConstants.StartupOption.REGULAR); + this.segmentMaxSize = + RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + this.preallocatedSize = + RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + this.bufferSize = + RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); + } + + @After + public void tearDown() throws Exception { + if (storageDir != null) { + FileUtils.deleteFully(storageDir.getParentFile()); + } + } + + private LogEntryProto[] prepareLog(List<SegmentRange> list) throws IOException { + List<LogEntryProto> entryList = new ArrayList<>(); + for (SegmentRange range : list) { + File file = range.isOpen ? + storage.getStorageDir().getOpenLogFile(range.start) : + storage.getStorageDir().getClosedLogFile(range.start, range.end); + + final int size = (int) (range.end - range.start + 1); + LogEntryProto[] entries = new LogEntryProto[size]; + try (LogOutputStream out = new LogOutputStream(file, false, + segmentMaxSize, preallocatedSize, bufferSize)) { + for (int i = 0; i < size; i++) { + SimpleOperation m = new SimpleOperation("m" + (i + range.start)); + entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, i + range.start); + out.write(entries[i]); + } + } + Collections.addAll(entryList, entries); + } + return entryList.toArray(new LogEntryProto[entryList.size()]); + } + + static List<SegmentRange> prepareRanges(int startTerm, int endTerm, int segmentSize, + long startIndex) { + List<SegmentRange> list = new ArrayList<>(endTerm - startTerm); + for (int i = startTerm; i < endTerm; i++) { + list.add(new SegmentRange(startIndex, startIndex + segmentSize - 1, i, + i == endTerm - 1)); + startIndex += segmentSize; + } + return list; + } + + private LogEntryProto getLastEntry(SegmentedRaftLog raftLog) + throws IOException { + return raftLog.get(raftLog.getLastEntryTermIndex().getIndex()); + } + + @Test + public void testLoadLogSegments() throws Exception { + // first generate log files + List<SegmentRange> ranges = prepareRanges(0, 5, 100, 0); + LogEntryProto[] entries = prepareLog(ranges); + + // create RaftLog object and load log file + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + // check if log entries are loaded correctly + for (LogEntryProto e : entries) { + LogEntryProto entry = raftLog.get(e.getIndex()); + Assert.assertEquals(e, entry); + } + + TermIndex[] termIndices = raftLog.getEntries(0, 500); + LogEntryProto[] entriesFromLog = Arrays.stream(termIndices) + .map(ti -> { + try { + return raftLog.get(ti.getIndex()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .toArray(LogEntryProto[]::new); + Assert.assertArrayEquals(entries, entriesFromLog); + Assert.assertEquals(entries[entries.length - 1], getLastEntry(raftLog)); + } + } + + static List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist, + Supplier<String> stringSupplier) { + List<LogEntryProto> eList = new ArrayList<>(); + for (SegmentRange range : slist) { + prepareLogEntries(range, stringSupplier, false, eList); + } + return eList; + } + + static List<LogEntryProto> prepareLogEntries(SegmentRange range, + Supplier<String> stringSupplier, boolean hasStataMachineData, List<LogEntryProto> eList) { + for(long index = range.start; index <= range.end; index++) { + eList.add(prepareLogEntry(range.term, index, stringSupplier, hasStataMachineData)); + } + return eList; + } + + static LogEntryProto prepareLogEntry(long term, long index, Supplier<String> stringSupplier, boolean hasStataMachineData) { + final SimpleOperation m = stringSupplier == null? + new SimpleOperation("m" + index, hasStataMachineData): + new SimpleOperation(stringSupplier.get(), hasStataMachineData); + return ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), term, index); + } + + /** + * Append entry one by one and check if log state is correct. + */ + @Test + public void testAppendEntry() throws Exception { + List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0); + List<LogEntryProto> entries = prepareLogEntries(ranges, null); + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + // append entries to the raftlog + entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); + } + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + // check if the raft log is correct + checkEntries(raftLog, entries, 0, entries.size()); + } + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + TermIndex lastTermIndex = raftLog.getLastEntryTermIndex(); + IllegalStateException ex = null; + try { + // append entry fails if append entry term is lower than log's last entry term + raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0)) + .setTerm(lastTermIndex.getTerm() - 1) + .setIndex(lastTermIndex.getIndex() + 1).build()); + } catch (IllegalStateException e) { + ex = e; + } + Assert.assertTrue(ex.getMessage().contains("term less than RaftLog's last term")); + try { + // append entry fails if difference between append entry index and log's last entry index is greater than 1 + raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0)) + .setTerm(lastTermIndex.getTerm()) + .setIndex(lastTermIndex.getIndex() + 2).build()); + } catch (IllegalStateException e) { + ex = e; + } + Assert.assertTrue(ex.getMessage().contains("and RaftLog's last index " + lastTermIndex.getIndex() + " greater than 1")); + } + } + + /** + * Keep appending entries, make sure the rolling is correct. + */ + @Test + public void testAppendAndRoll() throws Exception { + RaftServerConfigKeys.Log.setPreallocatedSize(properties, SizeInBytes.valueOf("16KB")); + RaftServerConfigKeys.Log.setSegmentSizeMax(properties, SizeInBytes.valueOf("128KB")); + + List<SegmentRange> ranges = prepareRanges(0, 1, 1024, 0); + final byte[] content = new byte[1024]; + List<LogEntryProto> entries = prepareLogEntries(ranges, + () -> new String(content)); + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + // append entries to the raftlog + entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); + } + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + // check if the raft log is correct + checkEntries(raftLog, entries, 0, entries.size()); + Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments()); + } + } + + @Test + public void testTruncate() throws Exception { + // prepare the log for truncation + List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0); + List<LogEntryProto> entries = prepareLogEntries(ranges, null); + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + // append entries to the raftlog + entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); + } + + for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) { + testTruncate(entries, fromIndex); + } + } + + private void testTruncate(List<LogEntryProto> entries, long fromIndex) + throws Exception { + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + // truncate the log + raftLog.truncate(fromIndex).join(); + + + checkEntries(raftLog, entries, 0, (int) fromIndex); + } + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + // check if the raft log is correct + if (fromIndex > 0) { + Assert.assertEquals(entries.get((int) (fromIndex - 1)), + getLastEntry(raftLog)); + } else { + Assert.assertNull(raftLog.getLastEntryTermIndex()); + } + checkEntries(raftLog, entries, 0, (int) fromIndex); + } + } + + private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected, + int offset, int size) throws IOException { + if (size > 0) { + for (int i = offset; i < size + offset; i++) { + LogEntryProto entry = raftLog.get(expected.get(i).getIndex()); + Assert.assertEquals(expected.get(i), entry); + } + TermIndex[] termIndices = raftLog.getEntries( + expected.get(offset).getIndex(), + expected.get(offset + size - 1).getIndex() + 1); + LogEntryProto[] entriesFromLog = Arrays.stream(termIndices) + .map(ti -> { + try { + return raftLog.get(ti.getIndex()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .toArray(LogEntryProto[]::new); + LogEntryProto[] expectedArray = expected.subList(offset, offset + size) + .stream().toArray(LogEntryProto[]::new); + Assert.assertArrayEquals(expectedArray, entriesFromLog); + } + } + + private void checkFailedEntries(List<LogEntryProto> entries, long fromIndex, RetryCache retryCache) { + for (int i = 0; i < entries.size(); i++) { + if (i < fromIndex) { + RetryCacheTestUtil.assertFailure(retryCache, entries.get(i), false); + } else { + RetryCacheTestUtil.assertFailure(retryCache, entries.get(i), true); + } + } + } + + /** + * Test append with inconsistent entries + */ + @Test + public void testAppendEntriesWithInconsistency() throws Exception { + // prepare the log for truncation + List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0); + List<LogEntryProto> entries = prepareLogEntries(ranges, null); + + RaftServerImpl server = mock(RaftServerImpl.class); + RetryCache retryCache = RetryCacheTestUtil.createRetryCache(); + when(server.getRetryCache()).thenReturn(retryCache); + doCallRealMethod().when(server).failClientRequest(any(LogEntryProto.class)); + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, server, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + entries.stream().forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry)); + // append entries to the raftlog + entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); + } + + // append entries whose first 100 entries are the same with existing log, + // and the next 100 are with different term + SegmentRange r1 = new SegmentRange(550, 599, 2, false); + SegmentRange r2 = new SegmentRange(600, 649, 3, false); + SegmentRange r3 = new SegmentRange(650, 749, 10, false); + List<LogEntryProto> newEntries = prepareLogEntries( + Arrays.asList(r1, r2, r3), null); + + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, server, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])).forEach(CompletableFuture::join); + + checkFailedEntries(entries, 650, retryCache); + checkEntries(raftLog, entries, 0, 650); + checkEntries(raftLog, newEntries, 100, 100); + Assert.assertEquals(newEntries.get(newEntries.size() - 1), + getLastEntry(raftLog)); + Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), + raftLog.getLatestFlushedIndex()); + } + + // load the raftlog again and check + try (SegmentedRaftLog raftLog = + new SegmentedRaftLog(peerId, server, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + checkEntries(raftLog, entries, 0, 650); + checkEntries(raftLog, newEntries, 100, 100); + Assert.assertEquals(newEntries.get(newEntries.size() - 1), + getLastEntry(raftLog)); + Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(), + raftLog.getLatestFlushedIndex()); + + RaftLogCache cache = raftLog.getRaftLogCache(); + Assert.assertEquals(5, cache.getNumOfSegments()); + } + } + + @Test + public void testSegmentedRaftLogStateMachineData() throws Exception { + final SegmentRange range = new SegmentRange(0, 10, 1, true); + final List<LogEntryProto> entries = prepareLogEntries(range, null, true, new ArrayList<>()); + + final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing(); + try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + + int next = 0; + long flush = -1; + assertIndices(raftLog, flush, next); + raftLog.appendEntry(entries.get(next++)); + assertIndices(raftLog, flush, next); + raftLog.appendEntry(entries.get(next++)); + assertIndices(raftLog, flush, next); + raftLog.appendEntry(entries.get(next++)); + assertIndicesMultipleAttempts(raftLog, flush += 3, next); + + sm.blockFlushStateMachineData(); + raftLog.appendEntry(entries.get(next++)); + { + sm.blockWriteStateMachineData(); + final Thread t = startAppendEntryThread(raftLog, entries.get(next++)); + TimeUnit.SECONDS.sleep(1); + Assert.assertTrue(t.isAlive()); + sm.unblockWriteStateMachineData(); + t.join(); + } + assertIndices(raftLog, flush, next); + TimeUnit.SECONDS.sleep(1); + assertIndices(raftLog, flush, next); + sm.unblockFlushStateMachineData(); + assertIndicesMultipleAttempts(raftLog, flush + 2, next); + } + } + + @Test + public void testSegmentedRaftLogStateMachineDataTimeoutIOException() throws Exception { + RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true); + final TimeDuration syncTimeout = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + RaftServerConfigKeys.Log.StateMachineData.setSyncTimeout(properties, syncTimeout); + final int numRetries = 2; + RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties, numRetries); + ExitUtils.disableSystemExit(); + + final LogEntryProto entry = prepareLogEntry(0, 0, null, true); + final StateMachine sm = new BaseStateMachine() { + @Override + public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) { + return new CompletableFuture<>(); // the future never completes + } + }; + + try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + raftLog.appendEntry(entry); // RaftLogWorker should catch TimeoutIOException + + JavaUtils.attempt(() -> { + final ExitUtils.ExitException exitException = ExitUtils.getFirstExitException(); + Objects.requireNonNull(exitException, "exitException == null"); + Assert.assertEquals(TimeoutIOException.class, exitException.getCause().getClass()); + }, 3*numRetries, syncTimeout, "RaftLogWorker should catch TimeoutIOException and exit", LOG); + } + } + + static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) { + final Thread t = new Thread(() -> raftLog.appendEntry(entry)); + t.start(); + return t; + } + + void assertIndices(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) { + LOG.info("assert expectedFlushIndex={}", expectedFlushIndex); + Assert.assertEquals(expectedFlushIndex, raftLog.getLatestFlushedIndex()); + LOG.info("assert expectedNextIndex={}", expectedNextIndex); + Assert.assertEquals(expectedNextIndex, raftLog.getNextIndex()); + } + + void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) throws Exception { + JavaUtils.attempt(() -> assertIndices(raftLog, expectedFlushIndex, expectedNextIndex), + 10, 100, "assertIndices", LOG); + } + + @Test + public void testSegmentedRaftLogFormatInternalHeader() throws Exception { + testFailureCase("testSegmentedRaftLogFormatInternalHeader", + () -> SegmentedRaftLogFormat.applyHeaderTo(header -> { + LOG.info("header = " + new String(header, StandardCharsets.UTF_8)); + header[0] += 1; // try changing the internal header + LOG.info("header' = " + new String(header, StandardCharsets.UTF_8)); + return null; + }), IllegalStateException.class); + + // reset the header + SegmentedRaftLogFormat.applyHeaderTo(header -> { + LOG.info("header' = " + new String(header, StandardCharsets.UTF_8)); + header[0] -= 1; // try changing the internal header + LOG.info("header'' = " + new String(header, StandardCharsets.UTF_8)); + return null; + }); + } +}
