This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 96351eb  RATIS-628. Simplify the code in LogSegment. Contributed by 
Tsz Wo Nicholas Sze.
96351eb is described below

commit 96351eb9e32bb5bb25dac0446854e5f45a308f15
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jul 26 17:54:20 2019 +0530

    RATIS-628. Simplify the code in LogSegment. Contributed by Tsz Wo Nicholas 
Sze.
---
 .../ratis/server/raftlog/segmented/LogSegment.java | 114 ++++++---------------
 .../server/raftlog/segmented/SegmentedRaftLog.java |  17 ++-
 .../server/raftlog/segmented/TestLogSegment.java   |  30 ++----
 .../segmented/TestSegmentedRaftLogCache.java       |   7 +-
 4 files changed, 51 insertions(+), 117 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index e696247..2ffc907 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -46,7 +46,7 @@ import java.util.function.Consumer;
  * In-memory cache for a log segment file. All the updates will be first 
written
  * into LogSegment then into corresponding files in the same order.
  *
- * This class will be protected by the RaftServer's lock.
+ * This class will be protected by the {@link SegmentedRaftLog}'s read-write 
lock.
  */
 class LogSegment implements Comparable<Long> {
   static final Logger LOG = LoggerFactory.getLogger(LogSegment.class);
@@ -63,7 +63,7 @@ class LogSegment implements Comparable<Long> {
 
     LogRecord(long offset, LogEntryProto entry) {
       this.offset = offset;
-      termIndex = TermIndex.newTermIndex(entry.getTerm(), entry.getIndex());
+      this.termIndex = ServerProtoUtils.toTermIndex(entry);
     }
 
     TermIndex getTermIndex() {
@@ -75,28 +75,6 @@ class LogSegment implements Comparable<Long> {
     }
   }
 
-  static class LogRecordWithEntry {
-    private final LogRecord record;
-    private final LogEntryProto entry;
-
-    LogRecordWithEntry(LogRecord record, LogEntryProto entry) {
-      this.record = record;
-      this.entry = entry;
-    }
-
-    LogRecord getRecord() {
-      return record;
-    }
-
-    LogEntryProto getEntry() {
-      return entry;
-    }
-
-    boolean hasEntry() {
-      return entry != null;
-    }
-  }
-
   static LogSegment newOpenSegment(RaftStorage storage, long start) {
     Preconditions.assertTrue(start >= 0);
     return new LogSegment(storage, true, start, start - 1);
@@ -189,20 +167,6 @@ class LogSegment implements Comparable<Long> {
         storage.getStorageDir().getClosedLogFile(startIndex, endIndex);
   }
 
-  public String toDebugString() {
-    final StringBuilder b = new StringBuilder()
-        .append("startIndex=").append(startIndex)
-        .append(", endIndex=").append(endIndex)
-        .append(", numOfEntries=").append(numOfEntries())
-        .append(", isOpen? ").append(isOpen)
-        .append(", file=").append(getSegmentFile());
-    records.stream().map(LogRecord::getTermIndex).forEach(
-        ti -> b.append("  ").append(ti).append(", cache=")
-            .append(ServerProtoUtils.toLogEntryString(entryCache.get(ti)))
-    );
-    return b.toString();
-  }
-
   private volatile boolean isOpen;
   private long totalSize;
   private final long startIndex;
@@ -211,7 +175,6 @@ class LogSegment implements Comparable<Long> {
   private final CacheLoader<LogRecord, LogEntryProto> cacheLoader = new 
LogEntryLoader();
   /** later replace it with a metric */
   private final AtomicInteger loadingTimes = new AtomicInteger();
-  private volatile boolean hasEntryCache;
 
   /**
    * the list of records is more like the index of a segment
@@ -229,7 +192,6 @@ class LogSegment implements Comparable<Long> {
     this.startIndex = start;
     this.endIndex = end;
     totalSize = SegmentedRaftLogFormat.getHeaderLength();
-    hasEntryCache = isOpen;
   }
 
   long getStartIndex() {
@@ -248,52 +210,39 @@ class LogSegment implements Comparable<Long> {
     return Math.toIntExact(endIndex - startIndex + 1);
   }
 
-  void appendToOpenSegment(LogEntryProto... entries) {
-    Preconditions.assertTrue(isOpen(),
-        "The log segment %s is not open for append", this.toString());
-    append(true, entries);
+  void appendToOpenSegment(LogEntryProto entry) {
+    Preconditions.assertTrue(isOpen(), "The log segment %s is not open for 
append", this);
+    append(true, entry);
   }
 
-  private void append(boolean keepEntryInCache, LogEntryProto... entries) {
-    Preconditions.assertTrue(entries != null && entries.length > 0);
-    final long term = entries[0].getTerm();
+  private void append(boolean keepEntryInCache, LogEntryProto entry) {
+    Objects.requireNonNull(entry, "entry == null");
     if (records.isEmpty()) {
-      Preconditions.assertTrue(entries[0].getIndex() == startIndex,
+      Preconditions.assertTrue(entry.getIndex() == startIndex,
           "gap between start index %s and first entry to append %s",
-          startIndex, entries[0].getIndex());
+          startIndex, entry.getIndex());
     }
-    for (LogEntryProto entry : entries) {
-      // all these entries should be of the same term
-      Preconditions.assertTrue(entry.getTerm() == term,
-          "expected term:%s, term of the entry:%s", term, entry.getTerm());
-      final LogRecord currentLast = getLastRecord();
-      if (currentLast != null) {
-        Preconditions.assertTrue(
-            entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
-            "gap between entries %s and %s", entry.getIndex(),
-            currentLast.getTermIndex().getIndex());
-      }
 
-      final LogRecord record = new LogRecord(totalSize, entry);
-      records.add(record);
-      if (keepEntryInCache) {
-        hasEntryCache = true;
-        entryCache.put(record.getTermIndex(), entry);
-      }
-      if (entry.hasConfigurationEntry()) {
-        configEntries.add(record.getTermIndex());
-      }
-      totalSize += getEntrySize(entry);
-      endIndex = entry.getIndex();
+    final LogRecord currentLast = getLastRecord();
+    if (currentLast != null) {
+      Preconditions.assertTrue(entry.getIndex() == 
currentLast.getTermIndex().getIndex() + 1,
+          "gap between entries %s and %s", entry.getIndex(), 
currentLast.getTermIndex().getIndex());
     }
-  }
 
-  LogRecordWithEntry getEntryWithoutLoading(long index) {
-    LogRecord record = getLogRecord(index);
-    if (record == null) {
-      return null;
+    final LogRecord record = new LogRecord(totalSize, entry);
+    records.add(record);
+    if (keepEntryInCache) {
+      entryCache.put(record.getTermIndex(), entry);
+    }
+    if (entry.hasConfigurationEntry()) {
+      configEntries.add(record.getTermIndex());
     }
-    return new LogRecordWithEntry(record, 
entryCache.get(record.getTermIndex()));
+    totalSize += getEntrySize(entry);
+    endIndex = entry.getIndex();
+  }
+
+  LogEntryProto getEntryFromCache(TermIndex ti) {
+    return entryCache.get(ti);
   }
 
   /**
@@ -305,9 +254,7 @@ class LogSegment implements Comparable<Long> {
       return entry;
     }
     try {
-      hasEntryCache = true;
-      entry = cacheLoader.load(record);
-      return entry;
+      return cacheLoader.load(record);
     } catch (Exception e) {
       throw new RaftLogIOException(e);
     }
@@ -342,13 +289,12 @@ class LogSegment implements Comparable<Long> {
    */
   void truncate(long fromIndex) {
     Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
-    LogRecord record = records.get(Math.toIntExact(fromIndex - startIndex));
     for (long index = endIndex; index >= fromIndex; index--) {
       LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
       entryCache.remove(removed.getTermIndex());
       configEntries.remove(removed.getTermIndex());
+      totalSize = removed.offset;
     }
-    totalSize = record.offset;
     isOpen = false;
     this.endIndex = fromIndex - 1;
   }
@@ -373,7 +319,6 @@ class LogSegment implements Comparable<Long> {
   void clear() {
     records.clear();
     entryCache.clear();
-    hasEntryCache = false;
     configEntries.clear();
     endIndex = startIndex - 1;
   }
@@ -383,12 +328,11 @@ class LogSegment implements Comparable<Long> {
   }
 
   void evictCache() {
-    hasEntryCache = false;
     entryCache.clear();
   }
 
   boolean hasCache() {
-    return hasEntryCache;
+    return isOpen || !entryCache.isEmpty(); // open segment always has cache.
   }
 
   boolean containsIndex(long index) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 6db41c7..96330d6 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -27,7 +27,6 @@ import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
-import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecordWithEntry;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -190,25 +189,25 @@ public class SegmentedRaftLog extends RaftLog {
   public LogEntryProto get(long index) throws RaftLogIOException {
     checkLogState();
     final LogSegment segment;
-    final LogRecordWithEntry recordAndEntry;
+    final LogRecord record;
     try (AutoCloseableLock readLock = readLock()) {
       segment = cache.getSegment(index);
       if (segment == null) {
         return null;
       }
-      recordAndEntry = segment.getEntryWithoutLoading(index);
-      if (recordAndEntry == null) {
+      record = segment.getLogRecord(index);
+      if (record == null) {
         return null;
       }
-      if (recordAndEntry.hasEntry()) {
-        return recordAndEntry.getEntry();
+      final LogEntryProto entry = 
segment.getEntryFromCache(record.getTermIndex());
+      if (entry != null) {
+        return entry;
       }
     }
 
-    // the entry is not in the segment's cache. Load the cache without holding
-    // RaftLog's lock.
+    // the entry is not in the segment's cache. Load the cache without holding 
the lock.
     checkAndEvictCache();
-    return segment.loadCache(recordAndEntry.getRecord());
+    return segment.loadCache(record);
   }
 
   @Override
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 5ab6a6c..3086e78 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -23,7 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
 import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecordWithEntry;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.server.storage.RaftStorageDirectory;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -42,7 +42,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -130,13 +129,15 @@ public class TestLogSegment extends BaseTest {
     long offset = SegmentedRaftLogFormat.getHeaderLength();
     for (long i = start; i <= end; i++) {
       LogSegment.LogRecord record = segment.getLogRecord(i);
-      LogRecordWithEntry lre = segment.getEntryWithoutLoading(i);
-      Assert.assertEquals(i, lre.getRecord().getTermIndex().getIndex());
-      Assert.assertEquals(term, lre.getRecord().getTermIndex().getTerm());
+      final TermIndex ti = record.getTermIndex();
+      Assert.assertEquals(i, ti.getIndex());
+      Assert.assertEquals(term, ti.getTerm());
       Assert.assertEquals(offset, record.getOffset());
 
-      LogEntryProto entry = lre.hasEntry() ?
-          lre.getEntry() : segment.loadCache(lre.getRecord());
+      LogEntryProto entry = segment.getEntryFromCache(ti);
+      if (entry == null) {
+        entry = segment.loadCache(record);
+      }
       offset += getEntrySize(entry);
     }
   }
@@ -193,15 +194,13 @@ public class TestLogSegment extends BaseTest {
     // append till full
     long term = 0;
     int i = 0;
-    List<LogEntryProto> list = new ArrayList<>();
     while (size < max) {
       SimpleOperation op = new SimpleOperation("m" + i);
       LogEntryProto entry = 
ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start);
       size += getEntrySize(entry);
-      list.add(entry);
+      segment.appendToOpenSegment(entry);
     }
 
-    segment.appendToOpenSegment(list.toArray(new LogEntryProto[list.size()]));
     Assert.assertTrue(segment.getTotalSize() >= max);
     checkLogSegment(segment, start, i - 1 + start, true, size, term);
   }
@@ -229,17 +228,6 @@ public class TestLogSegment extends BaseTest {
     } catch (IllegalStateException e) {
       // the exception is expected.
     }
-
-    LogEntryProto[] entries = new LogEntryProto[2];
-    for (int i = 0; i < 2; i++) {
-      entries[i] = ServerProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2);
-    }
-    try {
-      segment.appendToOpenSegment(entries);
-      Assert.fail("should fail since there is gap between entries");
-    } catch (IllegalStateException e) {
-      // the exception is expected.
-    }
   }
 
   @Test
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 0ce0821..660c2f8 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -26,6 +26,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
+import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,12 +55,14 @@ public class TestSegmentedRaftLogCache {
     return s;
   }
 
-  private void checkCache(long start, long end, int segmentSize) throws 
IOException {
+  private void checkCache(long start, long end, int segmentSize) {
     Assert.assertEquals(start, cache.getStartIndex());
     Assert.assertEquals(end, cache.getEndIndex());
 
     for (long index = start; index <= end; index++) {
-      LogEntryProto entry = 
cache.getSegment(index).getEntryWithoutLoading(index).getEntry();
+      final LogSegment segment = cache.getSegment(index);
+      final LogRecord record = segment.getLogRecord(index);
+      final LogEntryProto entry = 
segment.getEntryFromCache(record.getTermIndex());
       Assert.assertEquals(index, entry.getIndex());
     }
 

Reply via email to