szetszwo commented on pull request #317:
URL: https://github.com/apache/incubator-ratis/pull/317#issuecomment-737819776
Here is more details:
```
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index fb58a4a8..3bd3076d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -60,8 +60,25 @@ public class LogSegment implements Comparable<Long> {
static final Logger LOG = LoggerFactory.getLogger(LogSegment.class);
- static long getEntrySize(LogEntryProto entry) {
- final int serialized =
ServerProtoUtils.removeStateMachineData(entry).getSerializedSize();
+ enum Op {
+ LOAD_SEGMENT_FILE,
+ WRITE_SEGMENT_FILE,
+ WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
+ WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE;
+ }
+
+ static long getEntrySize(LogEntryProto entry, Op op) {
+ LogEntryProto e = entry;
+ if (op == Op.WRITE_SEGMENT_FILE) {
+ e = ServerProtoUtils.removeStateMachineData(entry);
+ } else if (op == Op.LOAD_SEGMENT_FILE || op ==
Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE) {
+ Preconditions.assertTrue(entry ==
ServerProtoUtils.removeStateMachineData(entry),
+ () -> "Unexpected LogEntryProto with StateMachine data: op=" + op
+ ", entry=" + entry);
+ } else {
+ Preconditions.assertTrue(op ==
Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE,
+ () -> "Unexpected op " + op + ", entry=" + entry);
+ }
+ final int serialized = e.getSerializedSize();
return serialized +
CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4L;
}
@@ -139,7 +156,7 @@ public class LogSegment implements Comparable<Long> {
final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage,
RaftStorage::getLogCorruptionPolicy);
final int entryCount = readSegmentFile(file, start, end, isOpen,
corruptionPolicy, raftLogMetrics, entry -> {
- segment.append(keepEntryInCache || isOpen, entry);
+ segment.append(keepEntryInCache || isOpen, entry,
Op.LOAD_SEGMENT_FILE);
if (logConsumer != null) {
logConsumer.accept(entry);
}
@@ -273,12 +290,12 @@ public class LogSegment implements Comparable<Long> {
return CorruptionPolicy.get(storage,
RaftStorage::getLogCorruptionPolicy);
}
- void appendToOpenSegment(LogEntryProto entry) {
+ void appendToOpenSegment(LogEntryProto entry, Op op) {
Preconditions.assertTrue(isOpen(), "The log segment %s is not open for
append", this);
- append(true, entry);
+ append(true, entry, op);
}
- private void append(boolean keepEntryInCache, LogEntryProto entry) {
+ private void append(boolean keepEntryInCache, LogEntryProto entry, Op op)
{
Objects.requireNonNull(entry, "entry == null");
if (records.isEmpty()) {
Preconditions.assertTrue(entry.getIndex() == startIndex,
@@ -300,7 +317,7 @@ public class LogSegment implements Comparable<Long> {
if (entry.hasConfigurationEntry()) {
configEntries.add(record.getTermIndex());
}
- totalSize += getEntrySize(entry);
+ totalSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index dbdbced9..ebbb00fa 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -414,9 +414,10 @@ public class SegmentedRaftLog extends RaftLog {
fileLogWorker.writeLogEntry(entry).getFuture();
if (stateMachineCachingEnabled) {
// The stateMachineData will be cached inside the StateMachine
itself.
- cache.appendEntry(ServerProtoUtils.removeStateMachineData(entry));
+ cache.appendEntry(ServerProtoUtils.removeStateMachineData(entry),
+ LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE);
} else {
- cache.appendEntry(entry);
+ cache.appendEntry(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}
return writeFuture;
} catch (Exception e) {
@@ -431,7 +432,7 @@ public class SegmentedRaftLog extends RaftLog {
if (segment.getTotalSize() >= segmentMaxSize) {
return true;
} else {
- final long entrySize = LogSegment.getEntrySize(entry);
+ final long entrySize = LogSegment.getEntrySize(entry,
LogSegment.Op.WRITE_SEGMENT_FILE);
// if entry size is greater than the max segment size, write it
directly
// into the current segment
return entrySize <= segmentMaxSize &&
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 260718f9..2177f890 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -521,11 +521,11 @@ public class SegmentedRaftLogCache {
closedSegments.get(closedSegments.size() -
1).getLastTermIndex());
}
- void appendEntry(LogEntryProto entry) {
+ void appendEntry(LogEntryProto entry, LogSegment.Op op) {
// SegmentedRaftLog does the segment creation/rolling work. Here we just
// simply append the entry into the open segment.
Preconditions.assertTrue(openSegment != null);
- openSegment.appendToOpenSegment(entry);
+ openSegment.appendToOpenSegment(entry, op);
}
/**
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]