Repository: incubator-ratis Updated Branches: refs/heads/master b2c691382 -> 13fdb9ee6
RATIS-76. Add loading policy for RaftLogCache. Contributed by Jing Zhao. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/13fdb9ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/13fdb9ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/13fdb9ee Branch: refs/heads/master Commit: 13fdb9ee6cb89a2bfa3533ab7955b33951de491b Parents: b2c6913 Author: Jing Zhao <[email protected]> Authored: Wed Apr 26 10:55:00 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Wed Apr 26 10:55:16 2017 -0700 ---------------------------------------------------------------------- .../ratis/grpc/server/GRpcLogAppender.java | 10 +- .../org/apache/ratis/grpc/TestRaftStream.java | 2 +- .../apache/ratis/server/impl/LogAppender.java | 16 +- .../apache/ratis/server/storage/LogSegment.java | 168 ++++++++++++++----- .../ratis/server/storage/MemoryRaftLog.java | 1 - .../apache/ratis/server/storage/RaftLog.java | 2 +- .../ratis/server/storage/RaftLogCache.java | 42 +++-- .../server/storage/RaftLogIOException.java | 29 ++++ .../ratis/server/storage/SegmentedRaftLog.java | 41 +++-- .../java/org/apache/ratis/RaftTestUtil.java | 20 ++- .../ratis/server/storage/TestRaftLogCache.java | 27 +-- .../server/storage/TestRaftLogSegment.java | 39 +++-- .../server/storage/TestSegmentedRaftLog.java | 21 ++- 13 files changed, 300 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index 656adc2..92dd257 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.grpc.server; +import org.apache.ratis.server.storage.RaftLogIOException; import org.apache.ratis.shaded.io.grpc.Status; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; @@ -79,7 +80,12 @@ public class GRpcLogAppender extends LogAppender { installSnapshot(snapshot, snapshotResponseHandler); } else { // keep appending log entries or sending heartbeats - appendLog(); + try { + appendLog(); + } catch (RaftLogIOException e) { + LOG.error(this + " hit IOException while loading raft log", e); + stopSender(); + } } } @@ -107,7 +113,7 @@ public class GRpcLogAppender extends LogAppender { shouldWaitForFirstResponse(); } - private void appendLog() { + private void appendLog() throws RaftLogIOException { if (appendLogRequestObserver == null) { appendLogRequestObserver = client.appendEntries(appendResponseHandler); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index 83ca5ec..1efe4d3 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -104,7 +104,7 @@ public class TestRaftStream { } private void checkLog(RaftLog raftLog, long expectedCommittedIndex, - Supplier<byte[]> s) { + Supplier<byte[]> s) throws IOException { long committedIndex = raftLog.getLastCommittedIndex(); Assert.assertEquals(expectedCommittedIndex, committedIndex); // check the log content http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index ff12f4e..a48d236 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -23,6 +23,7 @@ import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.server.storage.RaftLogIOException; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.SnapshotInfo; @@ -87,6 +88,8 @@ public class LogAppender extends Daemon { checkAndSendAppendEntries(); } catch (InterruptedException | InterruptedIOException e) { LOG.info(this + " was interrupted: " + e); + } catch (RaftLogIOException e) { + LOG.error(this + " hit IOException while loading raft log", e); } } @@ -150,7 +153,7 @@ public class LogAppender extends Daemon { return previous; } - protected AppendEntriesRequestProto createRequest() { + protected AppendEntriesRequestProto createRequest() throws RaftLogIOException { final TermIndex previous = getPrevious(); final long leaderNext = raftLog.getNextIndex(); long next = follower.getNextIndex() + buffer.getPendingEntryNum(); @@ -178,7 +181,7 @@ public class LogAppender extends Daemon { /** Send an appendEntries RPC; retry indefinitely. */ private AppendEntriesReplyProto sendAppendEntriesWithRetries() - throws InterruptedException, InterruptedIOException { + throws InterruptedException, InterruptedIOException, RaftLogIOException { int retry = 0; AppendEntriesRequestProto request = null; while (isAppenderRunning()) { // keep retrying for IOException @@ -202,9 +205,10 @@ public class LogAppender extends Daemon { follower.updateLastRpcResponseTime(); return r; - } catch (InterruptedIOException iioe) { - throw iioe; + } catch (InterruptedIOException | RaftLogIOException e) { + throw e; } catch (IOException ioe) { + // TODO should have more detailed retry policy here. LOG.trace(this + ": failed to send appendEntries; retry " + retry++, ioe); } if (isAppenderRunning()) { @@ -279,7 +283,7 @@ public class LogAppender extends Daemon { InstallSnapshotRequestProto request = server.createInstallSnapshotRequest(follower.getPeer().getId(), requestId, requestIndex++, snapshot, - Arrays.asList(chunk), done); + Collections.singletonList(chunk), done); currentOffset += targetLength; chunkIndex++; @@ -372,7 +376,7 @@ public class LogAppender extends Daemon { /** Check and send appendEntries RPC */ private void checkAndSendAppendEntries() - throws InterruptedException, InterruptedIOException { + throws InterruptedException, InterruptedIOException, RaftLogIOException { while (isAppenderRunning()) { if (shouldSendRequest()) { SnapshotInfo snapshot = shouldInstallSnapshot(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java index 3856585..6c478dd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java @@ -17,8 +17,10 @@ */ package org.apache.ratis.server.storage; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.ratis.shaded.com.google.common.cache.CacheLoader; import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.FileUtils; @@ -28,11 +30,15 @@ import org.apache.ratis.util.ProtoUtils; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** @@ -47,7 +53,6 @@ class LogSegment implements Comparable<Long> { return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4; } - @VisibleForTesting static class LogRecord { /** starting offset in the file */ private final long offset; @@ -67,44 +72,42 @@ class LogSegment implements Comparable<Long> { } } - private boolean isOpen; - private long totalSize; - private final long startIndex; - private long endIndex; - /** - * the list of records is more like the index of a segment - */ - private final List<LogRecord> records = new ArrayList<>(); - /** - * the entryCache caches the content of log entries. - * TODO: currently we cache all the log entries. will fix it soon. - */ - private final Map<TermIndex, LogEntryProto> entryCache = new HashMap<>(); - private final Set<TermIndex> configEntries = new HashSet<>(); + static class LogRecordWithEntry { + private final LogRecord record; + private final LogEntryProto entry; - private LogSegment(boolean isOpen, long start, long end) { - this.isOpen = isOpen; - this.startIndex = start; - this.endIndex = end; - totalSize = SegmentedRaftLog.HEADER_BYTES.length; + LogRecordWithEntry(LogRecord record, LogEntryProto entry) { + this.record = record; + this.entry = entry; + } + + LogRecord getRecord() { + return record; + } + + LogEntryProto getEntry() { + return entry; + } + + boolean hasEntry() { + return entry != null; + } } - static LogSegment newOpenSegment(long start) { + static LogSegment newOpenSegment(RaftStorage storage, long start) { Preconditions.assertTrue(start >= 0); - return new LogSegment(true, start, start - 1); + return new LogSegment(storage, true, start, start - 1); } - private static LogSegment newCloseSegment(long start, long end) { + private static LogSegment newCloseSegment(RaftStorage storage, + long start, long end) { Preconditions.assertTrue(start >= 0 && end >= start); - return new LogSegment(false, start, end); + return new LogSegment(storage, false, start, end); } - static LogSegment loadSegment(File file, long start, long end, boolean isOpen, - Consumer<LogEntryProto> logConsumer) throws IOException { - final LogSegment segment; + private static void readSegmentFile(File file, long start, long end, + boolean isOpen, Consumer<LogEntryProto> entryConsumer) throws IOException { try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) { - segment = isOpen ? LogSegment.newOpenSegment(start) : - LogSegment.newCloseSegment(start, end); LogEntryProto next; LogEntryProto prev = null; while ((next = in.nextEntry()) != null) { @@ -112,14 +115,29 @@ class LogSegment implements Comparable<Long> { Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1, "gap between entry %s and entry %s", prev, next); } - segment.append(next); - if (logConsumer != null) { - logConsumer.accept(next); + if (entryConsumer != null) { + entryConsumer.accept(next); } prev = next; } } + } + + static LogSegment loadSegment(RaftStorage storage, File file, + long start, long end, boolean isOpen, + boolean keptInCache, Consumer<LogEntryProto> logConsumer) + throws IOException { + final LogSegment segment = isOpen ? + LogSegment.newOpenSegment(storage, start) : + LogSegment.newCloseSegment(storage, start, end); + + readSegmentFile(file, start, end, isOpen, entry -> { + segment.append(keptInCache | isOpen, entry); + if (logConsumer != null) { + logConsumer.accept(entry); + } + }); // truncate padding if necessary if (file.length() > segment.getTotalSize()) { @@ -133,6 +151,57 @@ class LogSegment implements Comparable<Long> { return segment; } + /** + * The current log entry loader simply loads the whole segment into the memory. + * In most of the cases this may be good enough considering the main use case + * for load log entries is for leader appending to followers. + * + * In the future we can make the cache loader configurable if necessary. + */ + class LogEntryLoader extends CacheLoader<LogRecord, LogEntryProto> { + @Override + public LogEntryProto load(LogRecord key) throws IOException { + final File file = getSegmentFile(); + readSegmentFile(file, startIndex, endIndex, isOpen, + entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry)); + loadingTimes.incrementAndGet(); + return Objects.requireNonNull(entryCache.get(key.getTermIndex())); + } + } + + private File getSegmentFile() { + return isOpen ? + storage.getStorageDir().getOpenLogFile(startIndex) : + storage.getStorageDir().getClosedLogFile(startIndex, endIndex); + } + + private boolean isOpen; + private long totalSize; + private final long startIndex; + private long endIndex; + private final RaftStorage storage; + private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new LogEntryLoader(); + /** later replace it with a metric */ + private final AtomicInteger loadingTimes = new AtomicInteger(); + + /** + * the list of records is more like the index of a segment + */ + private final List<LogRecord> records = new ArrayList<>(); + /** + * the entryCache caches the content of log entries. + */ + private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>(); + private final Set<TermIndex> configEntries = new HashSet<>(); + + private LogSegment(RaftStorage storage, boolean isOpen, long start, long end) { + this.storage = storage; + this.isOpen = isOpen; + this.startIndex = start; + this.endIndex = end; + totalSize = SegmentedRaftLog.HEADER_BYTES.length; + } + long getStartIndex() { return startIndex; } @@ -152,10 +221,10 @@ class LogSegment implements Comparable<Long> { void appendToOpenSegment(LogEntryProto... entries) { Preconditions.assertTrue(isOpen(), "The log segment %s is not open for append", this.toString()); - append(entries); + append(true, entries); } - private void append(LogEntryProto... entries) { + private void append(boolean keptInCache, LogEntryProto... entries) { Preconditions.assertTrue(entries != null && entries.length > 0); final long term = entries[0].getTerm(); if (records.isEmpty()) { @@ -177,7 +246,9 @@ class LogSegment implements Comparable<Long> { final LogRecord record = new LogRecord(totalSize, entry); records.add(record); - entryCache.put(record.getTermIndex(), entry); + if (keptInCache) { + entryCache.put(record.getTermIndex(), entry); + } if (ProtoUtils.isConfigurationLogEntry(entry)) { configEntries.add(record.getTermIndex()); } @@ -186,14 +257,27 @@ class LogSegment implements Comparable<Long> { } } - LogEntryProto getLogEntry(long index) { + LogRecordWithEntry getEntryWithoutLoading(long index) { LogRecord record = getLogRecord(index); - return record == null ? null : entryCache.get(record.getTermIndex()); + if (record == null) { + return null; + } + return new LogRecordWithEntry(record, entryCache.get(record.getTermIndex())); } - TermIndex getTermIndex(long index) { - LogRecord record = getLogRecord(index); - return record == null ? null : record.getTermIndex(); + /** + * Acquire LogSegment's monitor so that there is no concurrent loading. + */ + synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException { + LogEntryProto entry = entryCache.get(record.getTermIndex()); + if (entry != null) { + return entry; + } + try { + return cacheLoader.load(record); + } catch (Exception e) { + throw new RaftLogIOException(e); + } } LogRecord getLogRecord(long index) { @@ -259,4 +343,8 @@ class LogSegment implements Comparable<Long> { configEntries.clear(); endIndex = startIndex - 1; } + + public int getLoadingTimes() { + return loadingTimes.get(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index df71d08..ecbae2e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -154,7 +154,6 @@ public class MemoryRaftLog extends RaftLog { if (toTruncate) { truncate(truncateIndex); } - // Collections.addAll(this.entries, entries); for (int i = index; i < entries.length; i++) { this.entries.add(entries[i]); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 5002f83..4d84a57 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -179,7 +179,7 @@ public abstract class RaftLog implements Closeable { * @return The log entry associated with the given index. * Null if there is no log entry with the index. */ - public abstract LogEntryProto get(long index); + public abstract LogEntryProto get(long index) throws RaftLogIOException; /** * Get the TermIndex information of the given index. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java index 0142bee..7f13dd8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -27,6 +27,7 @@ import java.util.NoSuchElementException; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.LogSegment.LogRecord; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.Preconditions; @@ -68,8 +69,10 @@ class RaftLogCache { private LogSegment openSegment; private final List<LogSegment> closedSegments; + private final RaftStorage storage; - RaftLogCache() { + RaftLogCache(RaftStorage storage) { + this.storage = storage; closedSegments = new ArrayList<>(); } @@ -116,13 +119,13 @@ class RaftLogCache { openSegment.close(); closedSegments.add(openSegment); if (createNewOpen) { - openSegment = LogSegment.newOpenSegment(nextIndex); + openSegment = LogSegment.newOpenSegment(storage, nextIndex); } else { openSegment = null; } } - private LogSegment getSegment(long index) { + LogSegment getSegment(long index) { if (openSegment != null && index >= openSegment.getStartIndex()) { return openSegment; } else { @@ -131,21 +134,16 @@ class RaftLogCache { } } - LogEntryProto getEntry(long index) { + LogRecord getLogRecord(long index) { LogSegment segment = getSegment(index); - return segment == null ? null : segment.getLogEntry(index); - } - - TermIndex getTermIndex(long index) { - LogSegment segment = getSegment(index); - return segment == null ? null : segment.getTermIndex(index); + return segment == null ? null : segment.getLogRecord(index); } /** * @param startIndex inclusive * @param endIndex exclusive */ - TermIndex[] getEntries(final long startIndex, final long endIndex) { + TermIndex[] getTermIndices(final long startIndex, final long endIndex) { if (startIndex < 0 || startIndex < getStartIndex()) { throw new IndexOutOfBoundsException("startIndex = " + startIndex + ", log cache starts from index " + getStartIndex()); @@ -162,19 +160,19 @@ class RaftLogCache { TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)]; int segmentIndex = Collections.binarySearch(closedSegments, startIndex); if (segmentIndex < 0) { - getEntriesFromSegment(openSegment, startIndex, entries, 0, entries.length); + getFromSegment(openSegment, startIndex, entries, 0, entries.length); } else { long index = startIndex; for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; i++) { LogSegment s = closedSegments.get(i); int numberFromSegment = Math.toIntExact( Math.min(realEnd - index, s.getEndIndex() - index + 1)); - getEntriesFromSegment(s, index, entries, + getFromSegment(s, index, entries, Math.toIntExact(index - startIndex), numberFromSegment); index += numberFromSegment; } if (index < realEnd) { - getEntriesFromSegment(openSegment, index, entries, + getFromSegment(openSegment, index, entries, Math.toIntExact(index - startIndex), Math.toIntExact(realEnd - index)); } @@ -182,13 +180,13 @@ class RaftLogCache { return entries; } - private void getEntriesFromSegment(LogSegment segment, long startIndex, + private void getFromSegment(LogSegment segment, long startIndex, TermIndex[] entries, int offset, int size) { long endIndex = segment.getEndIndex(); endIndex = Math.min(endIndex, startIndex + size - 1); int index = offset; for (long i = startIndex; i <= endIndex; i++) { - LogSegment.LogRecord r = segment.getLogRecord(i); + LogRecord r = segment.getLogRecord(i); entries[index++] = r == null ? null : r.getTermIndex(); } } @@ -286,11 +284,11 @@ class RaftLogCache { return null; } - Iterator<LogEntryProto> iterator(long startIndex) { + Iterator<TermIndex> iterator(long startIndex) { return new EntryIterator(startIndex); } - private class EntryIterator implements Iterator<LogEntryProto> { + private class EntryIterator implements Iterator<TermIndex> { private long nextIndex; private LogSegment currentSegment; private int segmentIndex; @@ -321,10 +319,10 @@ class RaftLogCache { } @Override - public LogEntryProto next() { - LogEntryProto entry; + public TermIndex next() { + LogRecord record; if (currentSegment == null || - (entry = currentSegment.getLogEntry(nextIndex)) == null) { + (record = currentSegment.getLogRecord(nextIndex)) == null) { throw new NoSuchElementException(); } if (++nextIndex > currentSegment.getEndIndex()) { @@ -334,7 +332,7 @@ class RaftLogCache { openSegment : closedSegments.get(segmentIndex); } } - return entry; + return record.getTermIndex(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java new file mode 100644 index 0000000..b9fb5ff --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.protocol.RaftException; + +/** + * Exception while reading/writing RaftLog + */ +public class RaftLogIOException extends RaftException { + public RaftLogIOException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index c956f84..8eaa0ab 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -23,6 +23,8 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.LogSegment.LogRecord; +import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; @@ -105,7 +107,7 @@ public class SegmentedRaftLog extends RaftLog { super(selfId); this.storage = storage; this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); - cache = new RaftLogCache(); + cache = new RaftLogCache(storage); fileLogWorker = new RaftLogWorker(server, storage, properties); lastCommitted.set(lastIndexInSnapshot); } @@ -136,8 +138,8 @@ public class SegmentedRaftLog extends RaftLog { List<LogPathAndIndex> paths = storage.getStorageDir().getLogSegmentFiles(); for (LogPathAndIndex pi : paths) { boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX; - LogSegment logSegment = LogSegment.loadSegment(pi.path.toFile(), - pi.startIndex, pi.endIndex, isOpen, logConsumer); + LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(), + pi.startIndex, pi.endIndex, isOpen, true, logConsumer); cache.addSegment(logSegment); } @@ -155,18 +157,35 @@ public class SegmentedRaftLog extends RaftLog { } @Override - public LogEntryProto get(long index) { + public LogEntryProto get(long index) throws RaftLogIOException { checkLogState(); - try(AutoCloseableLock readLock = readLock()) { - return cache.getEntry(index); + LogSegment segment; + LogRecordWithEntry recordAndEntry; + try (AutoCloseableLock readLock = readLock()) { + segment = cache.getSegment(index); + if (segment == null) { + return null; + } + recordAndEntry = segment.getEntryWithoutLoading(index); + if (recordAndEntry == null) { + return null; + } + if (recordAndEntry.hasEntry()) { + return recordAndEntry.getEntry(); + } } + + // the entry is not in the segment's cache. Load the cache without holding + // RaftLog's lock. + return segment.loadCache(recordAndEntry.getRecord()); } @Override public TermIndex getTermIndex(long index) { checkLogState(); try(AutoCloseableLock readLock = readLock()) { - return cache.getTermIndex(index); + LogRecord record = cache.getLogRecord(index); + return record != null ? record.getTermIndex() : null; } } @@ -174,7 +193,7 @@ public class SegmentedRaftLog extends RaftLog { public TermIndex[] getEntries(long startIndex, long endIndex) { checkLogState(); try(AutoCloseableLock readLock = readLock()) { - return cache.getEntries(startIndex, endIndex); + return cache.getTermIndices(startIndex, endIndex); } } @@ -208,7 +227,7 @@ public class SegmentedRaftLog extends RaftLog { try(AutoCloseableLock writeLock = writeLock()) { final LogSegment currentOpenSegment = cache.getOpenSegment(); if (currentOpenSegment == null) { - cache.addSegment(LogSegment.newOpenSegment(entry.getIndex())); + cache.addSegment(LogSegment.newOpenSegment(storage, entry.getIndex())); fileLogWorker.startLogSegment(getNextIndex()); } else if (isSegmentFull(currentOpenSegment, entry)) { cache.rollOpenSegment(true); @@ -248,11 +267,11 @@ public class SegmentedRaftLog extends RaftLog { return; } try(AutoCloseableLock writeLock = writeLock()) { - Iterator<LogEntryProto> iter = cache.iterator(entries[0].getIndex()); + Iterator<TermIndex> iter = cache.iterator(entries[0].getIndex()); int index = 0; long truncateIndex = -1; for (; iter.hasNext() && index < entries.length; index++) { - LogEntryProto storedEntry = iter.next(); + TermIndex storedEntry = iter.next(); Preconditions.assertTrue( storedEntry.getIndex() == entries[index].getIndex(), "The stored entry's index %s is not consistent with" + http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 76258e5..0680df9 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -101,10 +101,13 @@ public class RaftTestUtil { TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE); while (idxEntries < termIndices.length && idxExpected < expectedMessages.length) { - if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(), - log.get(termIndices[idxEntries].getIndex()).getSmLogEntry() - .getData().toByteArray())) { - ++idxExpected; + try { + if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(), + log.get(termIndices[idxEntries].getIndex()).getSmLogEntry().getData().toByteArray())) { + ++idxExpected; + } + } catch (IOException e) { + throw new RuntimeException(e); } ++idxEntries; } @@ -112,7 +115,7 @@ public class RaftTestUtil { } public static void assertLogEntries(Collection<RaftServerImpl> servers, - SimpleMessage... expectedMessages) { + SimpleMessage... expectedMessages) { final int size = servers.size(); final long count = servers.stream() .filter(RaftServerImpl::isAlive) @@ -129,7 +132,12 @@ public class RaftTestUtil { long startIndex, long expertedTerm, SimpleMessage... expectedMessages) { Assert.assertEquals(expectedMessages.length, entries.length); for(int i = 0; i < entries.length; i++) { - final LogEntryProto e = log.get(entries[i].getIndex()); + final LogEntryProto e; + try { + e = log.get(entries[i].getIndex()); + } catch (IOException exception) { + throw new RuntimeException(exception); + } Assert.assertEquals(expertedTerm, e.getTerm()); Assert.assertEquals(startIndex + i, e.getIndex()); Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(), http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java index d9f7c10..38c879b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server.storage; +import java.io.IOException; import java.util.Iterator; import org.apache.ratis.RaftTestUtil.SimpleOperation; @@ -37,11 +38,11 @@ public class TestRaftLogCache { @Before public void setup() { - cache = new RaftLogCache(); + cache = new RaftLogCache(null); } private LogSegment prepareLogSegment(long start, long end, boolean isOpen) { - LogSegment s = LogSegment.newOpenSegment(start); + LogSegment s = LogSegment.newOpenSegment(null, start); for (long i = start; i <= end; i++) { SimpleOperation m = new SimpleOperation("m" + i); LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), @@ -54,12 +55,12 @@ public class TestRaftLogCache { return s; } - private void checkCache(long start, long end, int segmentSize) { + private void checkCache(long start, long end, int segmentSize) throws IOException { Assert.assertEquals(start, cache.getStartIndex()); Assert.assertEquals(end, cache.getEndIndex()); for (long index = start; index <= end; index++) { - LogEntryProto entry = cache.getEntry(index); + LogEntryProto entry = cache.getSegment(index).getEntryWithoutLoading(index).getEntry(); Assert.assertEquals(index, entry.getIndex()); } @@ -75,7 +76,7 @@ public class TestRaftLogCache { } private void checkCacheEntries(long offset, int size, long end) { - TermIndex[] entries = cache.getEntries(offset, offset + size); + TermIndex[] entries = cache.getTermIndices(offset, offset + size); long realEnd = offset + size > end + 1 ? end + 1 : offset + size; Assert.assertEquals(realEnd - offset, entries.length); for (long i = offset; i < realEnd; i++) { @@ -220,16 +221,16 @@ public class TestRaftLogCache { Assert.assertEquals(249, ts.toTruncate.endIndex); } - private void testIterator(long startIndex) { - Iterator<LogEntryProto> iterator = cache.iterator(startIndex); - LogEntryProto prev = null; + private void testIterator(long startIndex) throws IOException { + Iterator<TermIndex> iterator = cache.iterator(startIndex); + TermIndex prev = null; while (iterator.hasNext()) { - LogEntryProto entry = iterator.next(); - Assert.assertEquals(cache.getEntry(entry.getIndex()), entry); + TermIndex termIndex = iterator.next(); + Assert.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex); if (prev != null) { - Assert.assertEquals(prev.getIndex() + 1, entry.getIndex()); + Assert.assertEquals(prev.getIndex() + 1, termIndex.getIndex()); } - prev = entry; + prev = termIndex; } if (startIndex <= cache.getEndIndex()) { Assert.assertNotNull(prev); @@ -254,7 +255,7 @@ public class TestRaftLogCache { } testIterator(299); - Iterator<LogEntryProto> iterator = cache.iterator(300); + Iterator<TermIndex> iterator = cache.iterator(300); Assert.assertFalse(iterator.hasNext()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java index 4e90d75..73709fc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.storage; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.RaftServerConfigKeys; @@ -86,7 +87,7 @@ public class TestRaftLogSegment { } private void checkLogSegment(LogSegment segment, long start, long end, - boolean isOpen, long totalSize, long term) { + boolean isOpen, long totalSize, long term) throws Exception { Assert.assertEquals(start, segment.getStartIndex()); Assert.assertEquals(end, segment.getEndIndex()); Assert.assertEquals(isOpen, segment.isOpen()); @@ -95,35 +96,51 @@ public class TestRaftLogSegment { long offset = SegmentedRaftLog.HEADER_BYTES.length; for (long i = start; i <= end; i++) { LogSegment.LogRecord record = segment.getLogRecord(i); - LogEntryProto entry = segment.getLogEntry(i); - Assert.assertEquals(i, entry.getIndex()); - Assert.assertEquals(term, entry.getTerm()); + LogRecordWithEntry lre = segment.getEntryWithoutLoading(i); + Assert.assertEquals(i, lre.getRecord().getTermIndex().getIndex()); + Assert.assertEquals(term, lre.getRecord().getTermIndex().getTerm()); Assert.assertEquals(offset, record.getOffset()); + LogEntryProto entry = lre.hasEntry() ? + lre.getEntry() : segment.loadCache(lre.getRecord()); offset += getEntrySize(entry); } } @Test public void testLoadLogSegment() throws Exception { + testLoadSegment(true); + } + + @Test + public void testLoadCache() throws Exception { + testLoadSegment(false); + } + + private void testLoadSegment(boolean loadInitial) throws Exception { // load an open segment File openSegmentFile = prepareLog(true, 0, 100, 0); - LogSegment openSegment = LogSegment.loadSegment(openSegmentFile, 0, - INVALID_LOG_INDEX, true, null); + RaftStorage storage = new RaftStorage(properties, StartupOption.REGULAR); + LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0, + INVALID_LOG_INDEX, true, loadInitial, null); checkLogSegment(openSegment, 0, 99, true, openSegmentFile.length(), 0); + storage.close(); + // for open segment we currently always keep log entries in the memory + Assert.assertEquals(0, openSegment.getLoadingTimes()); // load a closed segment (1000-1099) File closedSegmentFile = prepareLog(false, 1000, 100, 1); - LogSegment closedSegment = LogSegment.loadSegment(closedSegmentFile, 1000, - 1099, false, null); + LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile, + 1000, 1099, false, loadInitial, null); checkLogSegment(closedSegment, 1000, 1099, false, closedSegment.getTotalSize(), 1); + Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes()); } @Test public void testAppendEntries() throws Exception { final long start = 1000; - LogSegment segment = LogSegment.newOpenSegment(start); + LogSegment segment = LogSegment.newOpenSegment(null, start); long size = SegmentedRaftLog.HEADER_BYTES.length; final long max = 8 * 1024 * 1024; checkLogSegment(segment, start, start - 1, true, size, 0); @@ -147,7 +164,7 @@ public class TestRaftLogSegment { @Test public void testAppendWithGap() throws Exception { - LogSegment segment = LogSegment.newOpenSegment(1000); + LogSegment segment = LogSegment.newOpenSegment(null, 1000); SimpleOperation op = new SimpleOperation("m"); final SMLogEntryProto m = op.getLogEntryContent(); try { @@ -185,7 +202,7 @@ public class TestRaftLogSegment { public void testTruncate() throws Exception { final long term = 1; final long start = 1000; - LogSegment segment = LogSegment.newOpenSegment(start); + LogSegment segment = LogSegment.newOpenSegment(null, start); for (int i = 0; i < 100; i++) { LogEntryProto entry = ProtoUtils.toLogEntryProto( new SimpleOperation("m" + i).getLogEntryContent(), term, i + start, clientId, callId); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/13fdb9ee/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index afd7d29..1d38971 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -120,7 +120,8 @@ public class TestSegmentedRaftLog { return list; } - private LogEntryProto getLastEntry(SegmentedRaftLog raftLog) { + private LogEntryProto getLastEntry(SegmentedRaftLog raftLog) + throws IOException { return raftLog.get(raftLog.getLastEntryTermIndex().getIndex()); } @@ -142,7 +143,13 @@ public class TestSegmentedRaftLog { TermIndex[] termIndices = raftLog.getEntries(0, 500); LogEntryProto[] entriesFromLog = Arrays.stream(termIndices) - .map(ti -> raftLog.get(ti.getIndex())) + .map(ti -> { + try { + return raftLog.get(ti.getIndex()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) .collect(Collectors.toList()) .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY); Assert.assertArrayEquals(entries, entriesFromLog); @@ -265,7 +272,7 @@ public class TestSegmentedRaftLog { } private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected, - int offset, int size) { + int offset, int size) throws IOException { if (size > 0) { for (int i = offset; i < size + offset; i++) { LogEntryProto entry = raftLog.get(expected.get(i).getIndex()); @@ -275,7 +282,13 @@ public class TestSegmentedRaftLog { expected.get(offset).getIndex(), expected.get(offset + size - 1).getIndex() + 1); LogEntryProto[] entriesFromLog = Arrays.stream(termIndices) - .map(ti -> raftLog.get(ti.getIndex())) + .map(ti -> { + try { + return raftLog.get(ti.getIndex()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) .collect(Collectors.toList()) .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY); LogEntryProto[] expectedArray = expected.subList(offset, offset + size)
