This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 96351eb RATIS-628. Simplify the code in LogSegment. Contributed by
Tsz Wo Nicholas Sze.
96351eb is described below
commit 96351eb9e32bb5bb25dac0446854e5f45a308f15
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jul 26 17:54:20 2019 +0530
RATIS-628. Simplify the code in LogSegment. Contributed by Tsz Wo Nicholas
Sze.
---
.../ratis/server/raftlog/segmented/LogSegment.java | 114 ++++++---------------
.../server/raftlog/segmented/SegmentedRaftLog.java | 17 ++-
.../server/raftlog/segmented/TestLogSegment.java | 30 ++----
.../segmented/TestSegmentedRaftLogCache.java | 7 +-
4 files changed, 51 insertions(+), 117 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index e696247..2ffc907 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -46,7 +46,7 @@ import java.util.function.Consumer;
* In-memory cache for a log segment file. All the updates will be first
written
* into LogSegment then into corresponding files in the same order.
*
- * This class will be protected by the RaftServer's lock.
+ * This class will be protected by the {@link SegmentedRaftLog}'s read-write
lock.
*/
class LogSegment implements Comparable<Long> {
static final Logger LOG = LoggerFactory.getLogger(LogSegment.class);
@@ -63,7 +63,7 @@ class LogSegment implements Comparable<Long> {
LogRecord(long offset, LogEntryProto entry) {
this.offset = offset;
- termIndex = TermIndex.newTermIndex(entry.getTerm(), entry.getIndex());
+ this.termIndex = ServerProtoUtils.toTermIndex(entry);
}
TermIndex getTermIndex() {
@@ -75,28 +75,6 @@ class LogSegment implements Comparable<Long> {
}
}
- static class LogRecordWithEntry {
- private final LogRecord record;
- private final LogEntryProto entry;
-
- LogRecordWithEntry(LogRecord record, LogEntryProto entry) {
- this.record = record;
- this.entry = entry;
- }
-
- LogRecord getRecord() {
- return record;
- }
-
- LogEntryProto getEntry() {
- return entry;
- }
-
- boolean hasEntry() {
- return entry != null;
- }
- }
-
static LogSegment newOpenSegment(RaftStorage storage, long start) {
Preconditions.assertTrue(start >= 0);
return new LogSegment(storage, true, start, start - 1);
@@ -189,20 +167,6 @@ class LogSegment implements Comparable<Long> {
storage.getStorageDir().getClosedLogFile(startIndex, endIndex);
}
- public String toDebugString() {
- final StringBuilder b = new StringBuilder()
- .append("startIndex=").append(startIndex)
- .append(", endIndex=").append(endIndex)
- .append(", numOfEntries=").append(numOfEntries())
- .append(", isOpen? ").append(isOpen)
- .append(", file=").append(getSegmentFile());
- records.stream().map(LogRecord::getTermIndex).forEach(
- ti -> b.append(" ").append(ti).append(", cache=")
- .append(ServerProtoUtils.toLogEntryString(entryCache.get(ti)))
- );
- return b.toString();
- }
-
private volatile boolean isOpen;
private long totalSize;
private final long startIndex;
@@ -211,7 +175,6 @@ class LogSegment implements Comparable<Long> {
private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new
LogEntryLoader();
/** later replace it with a metric */
private final AtomicInteger loadingTimes = new AtomicInteger();
- private volatile boolean hasEntryCache;
/**
* the list of records is more like the index of a segment
@@ -229,7 +192,6 @@ class LogSegment implements Comparable<Long> {
this.startIndex = start;
this.endIndex = end;
totalSize = SegmentedRaftLogFormat.getHeaderLength();
- hasEntryCache = isOpen;
}
long getStartIndex() {
@@ -248,52 +210,39 @@ class LogSegment implements Comparable<Long> {
return Math.toIntExact(endIndex - startIndex + 1);
}
- void appendToOpenSegment(LogEntryProto... entries) {
- Preconditions.assertTrue(isOpen(),
- "The log segment %s is not open for append", this.toString());
- append(true, entries);
+ void appendToOpenSegment(LogEntryProto entry) {
+ Preconditions.assertTrue(isOpen(), "The log segment %s is not open for
append", this);
+ append(true, entry);
}
- private void append(boolean keepEntryInCache, LogEntryProto... entries) {
- Preconditions.assertTrue(entries != null && entries.length > 0);
- final long term = entries[0].getTerm();
+ private void append(boolean keepEntryInCache, LogEntryProto entry) {
+ Objects.requireNonNull(entry, "entry == null");
if (records.isEmpty()) {
- Preconditions.assertTrue(entries[0].getIndex() == startIndex,
+ Preconditions.assertTrue(entry.getIndex() == startIndex,
"gap between start index %s and first entry to append %s",
- startIndex, entries[0].getIndex());
+ startIndex, entry.getIndex());
}
- for (LogEntryProto entry : entries) {
- // all these entries should be of the same term
- Preconditions.assertTrue(entry.getTerm() == term,
- "expected term:%s, term of the entry:%s", term, entry.getTerm());
- final LogRecord currentLast = getLastRecord();
- if (currentLast != null) {
- Preconditions.assertTrue(
- entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
- "gap between entries %s and %s", entry.getIndex(),
- currentLast.getTermIndex().getIndex());
- }
- final LogRecord record = new LogRecord(totalSize, entry);
- records.add(record);
- if (keepEntryInCache) {
- hasEntryCache = true;
- entryCache.put(record.getTermIndex(), entry);
- }
- if (entry.hasConfigurationEntry()) {
- configEntries.add(record.getTermIndex());
- }
- totalSize += getEntrySize(entry);
- endIndex = entry.getIndex();
+ final LogRecord currentLast = getLastRecord();
+ if (currentLast != null) {
+ Preconditions.assertTrue(entry.getIndex() ==
currentLast.getTermIndex().getIndex() + 1,
+ "gap between entries %s and %s", entry.getIndex(),
currentLast.getTermIndex().getIndex());
}
- }
- LogRecordWithEntry getEntryWithoutLoading(long index) {
- LogRecord record = getLogRecord(index);
- if (record == null) {
- return null;
+ final LogRecord record = new LogRecord(totalSize, entry);
+ records.add(record);
+ if (keepEntryInCache) {
+ entryCache.put(record.getTermIndex(), entry);
+ }
+ if (entry.hasConfigurationEntry()) {
+ configEntries.add(record.getTermIndex());
}
- return new LogRecordWithEntry(record,
entryCache.get(record.getTermIndex()));
+ totalSize += getEntrySize(entry);
+ endIndex = entry.getIndex();
+ }
+
+ LogEntryProto getEntryFromCache(TermIndex ti) {
+ return entryCache.get(ti);
}
/**
@@ -305,9 +254,7 @@ class LogSegment implements Comparable<Long> {
return entry;
}
try {
- hasEntryCache = true;
- entry = cacheLoader.load(record);
- return entry;
+ return cacheLoader.load(record);
} catch (Exception e) {
throw new RaftLogIOException(e);
}
@@ -342,13 +289,12 @@ class LogSegment implements Comparable<Long> {
*/
void truncate(long fromIndex) {
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
- LogRecord record = records.get(Math.toIntExact(fromIndex - startIndex));
for (long index = endIndex; index >= fromIndex; index--) {
LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
entryCache.remove(removed.getTermIndex());
configEntries.remove(removed.getTermIndex());
+ totalSize = removed.offset;
}
- totalSize = record.offset;
isOpen = false;
this.endIndex = fromIndex - 1;
}
@@ -373,7 +319,6 @@ class LogSegment implements Comparable<Long> {
void clear() {
records.clear();
entryCache.clear();
- hasEntryCache = false;
configEntries.clear();
endIndex = startIndex - 1;
}
@@ -383,12 +328,11 @@ class LogSegment implements Comparable<Long> {
}
void evictCache() {
- hasEntryCache = false;
entryCache.clear();
}
boolean hasCache() {
- return hasEntryCache;
+ return isOpen || !entryCache.isEmpty(); // open segment always has cache.
}
boolean containsIndex(long index) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 6db41c7..96330d6 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -27,7 +27,6 @@ import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
-import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecordWithEntry;
import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices;
import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -190,25 +189,25 @@ public class SegmentedRaftLog extends RaftLog {
public LogEntryProto get(long index) throws RaftLogIOException {
checkLogState();
final LogSegment segment;
- final LogRecordWithEntry recordAndEntry;
+ final LogRecord record;
try (AutoCloseableLock readLock = readLock()) {
segment = cache.getSegment(index);
if (segment == null) {
return null;
}
- recordAndEntry = segment.getEntryWithoutLoading(index);
- if (recordAndEntry == null) {
+ record = segment.getLogRecord(index);
+ if (record == null) {
return null;
}
- if (recordAndEntry.hasEntry()) {
- return recordAndEntry.getEntry();
+ final LogEntryProto entry =
segment.getEntryFromCache(record.getTermIndex());
+ if (entry != null) {
+ return entry;
}
}
- // the entry is not in the segment's cache. Load the cache without holding
- // RaftLog's lock.
+ // the entry is not in the segment's cache. Load the cache without holding
the lock.
checkAndEvictCache();
- return segment.loadCache(recordAndEntry.getRecord());
+ return segment.loadCache(record);
}
@Override
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 5ab6a6c..3086e78 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -23,7 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecordWithEntry;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -42,7 +42,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -130,13 +129,15 @@ public class TestLogSegment extends BaseTest {
long offset = SegmentedRaftLogFormat.getHeaderLength();
for (long i = start; i <= end; i++) {
LogSegment.LogRecord record = segment.getLogRecord(i);
- LogRecordWithEntry lre = segment.getEntryWithoutLoading(i);
- Assert.assertEquals(i, lre.getRecord().getTermIndex().getIndex());
- Assert.assertEquals(term, lre.getRecord().getTermIndex().getTerm());
+ final TermIndex ti = record.getTermIndex();
+ Assert.assertEquals(i, ti.getIndex());
+ Assert.assertEquals(term, ti.getTerm());
Assert.assertEquals(offset, record.getOffset());
- LogEntryProto entry = lre.hasEntry() ?
- lre.getEntry() : segment.loadCache(lre.getRecord());
+ LogEntryProto entry = segment.getEntryFromCache(ti);
+ if (entry == null) {
+ entry = segment.loadCache(record);
+ }
offset += getEntrySize(entry);
}
}
@@ -193,15 +194,13 @@ public class TestLogSegment extends BaseTest {
// append till full
long term = 0;
int i = 0;
- List<LogEntryProto> list = new ArrayList<>();
while (size < max) {
SimpleOperation op = new SimpleOperation("m" + i);
LogEntryProto entry =
ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start);
size += getEntrySize(entry);
- list.add(entry);
+ segment.appendToOpenSegment(entry);
}
- segment.appendToOpenSegment(list.toArray(new LogEntryProto[list.size()]));
Assert.assertTrue(segment.getTotalSize() >= max);
checkLogSegment(segment, start, i - 1 + start, true, size, term);
}
@@ -229,17 +228,6 @@ public class TestLogSegment extends BaseTest {
} catch (IllegalStateException e) {
// the exception is expected.
}
-
- LogEntryProto[] entries = new LogEntryProto[2];
- for (int i = 0; i < 2; i++) {
- entries[i] = ServerProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2);
- }
- try {
- segment.appendToOpenSegment(entries);
- Assert.fail("should fail since there is gap between entries");
- } catch (IllegalStateException e) {
- // the exception is expected.
- }
}
@Test
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 0ce0821..660c2f8 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -26,6 +26,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
+import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.junit.Assert;
import org.junit.Before;
@@ -54,12 +55,14 @@ public class TestSegmentedRaftLogCache {
return s;
}
- private void checkCache(long start, long end, int segmentSize) throws
IOException {
+ private void checkCache(long start, long end, int segmentSize) {
Assert.assertEquals(start, cache.getStartIndex());
Assert.assertEquals(end, cache.getEndIndex());
for (long index = start; index <= end; index++) {
- LogEntryProto entry =
cache.getSegment(index).getEntryWithoutLoading(index).getEntry();
+ final LogSegment segment = cache.getSegment(index);
+ final LogRecord record = segment.getLogRecord(index);
+ final LogEntryProto entry =
segment.getEntryFromCache(record.getTermIndex());
Assert.assertEquals(index, entry.getIndex());
}