Repository: incubator-ratis
Updated Branches:
  refs/heads/master b600fc21d -> 0b22e0935


RATIS-430. RaftLogCache#getCachedSegmentNum hits 
ConcurrentModificationException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/0b22e093
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/0b22e093
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/0b22e093

Branch: refs/heads/master
Commit: 0b22e0935a95aea1eb70d38e713526d71d71df45
Parents: b600fc2
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Thu Nov 22 11:41:35 2018 -0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Thu Nov 22 11:42:07 2018 -0800

----------------------------------------------------------------------
 .../server/storage/CacheInvalidationPolicy.java |  15 +-
 .../ratis/server/storage/RaftLogCache.java      | 301 +++++++++++++------
 .../server/storage/RaftStorageDirectory.java    |   4 +-
 .../ratis/server/storage/SegmentedRaftLog.java  |  52 +---
 .../ratis/server/storage/TestCacheEviction.java |  11 +-
 5 files changed, 246 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b22e093/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
index 12534cf..a794092 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/CacheInvalidationPolicy.java
@@ -21,6 +21,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.ratis.server.storage.RaftLogCache.LogSegmentList;
+import org.apache.ratis.util.AutoCloseableLock;
+
 public interface CacheInvalidationPolicy {
   /**
    * Determine which log segments should evict their log entry cache
@@ -34,13 +37,21 @@ public interface CacheInvalidationPolicy {
    * @return the log segments that should evict cache
    */
   List<LogSegment> evict(long[] followerNextIndices, long localFlushedIndex,
-      long lastAppliedIndex, List<LogSegment> segments, int maxCachedSegments);
+      long lastAppliedIndex, LogSegmentList segments, int maxCachedSegments);
 
   class CacheInvalidationPolicyDefault implements CacheInvalidationPolicy {
     @Override
     public List<LogSegment> evict(long[] followerNextIndices,
         long localFlushedIndex, long lastAppliedIndex,
-        List<LogSegment> segments, final int maxCachedSegments) {
+        LogSegmentList segments, final int maxCachedSegments) {
+      try(AutoCloseableLock readLock = segments.readLock()) {
+        return evictImpl(followerNextIndices, localFlushedIndex, 
lastAppliedIndex, segments, maxCachedSegments);
+      }
+    }
+
+    private List<LogSegment> evictImpl(long[] followerNextIndices,
+        long localFlushedIndex, long lastAppliedIndex,
+        LogSegmentList segments, final int maxCachedSegments) {
       List<LogSegment> result = new ArrayList<>();
       int safeIndex = segments.size() - 1;
       for (; safeIndex >= 0; safeIndex--) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b22e093/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 451e713..f0cab43 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -21,17 +21,20 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import 
org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
 import org.apache.ratis.server.storage.LogSegment.LogRecord;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 
 import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
@@ -86,9 +89,159 @@ class RaftLogCache {
     }
   }
 
+  static class LogSegmentList {
+    private final List<LogSegment> segments = new ArrayList<>();
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+
+    AutoCloseableLock readLock() {
+      return AutoCloseableLock.acquire(lock.readLock());
+    }
+
+    AutoCloseableLock writeLock() {
+      return AutoCloseableLock.acquire(lock.writeLock());
+    }
+
+    boolean isEmpty() {
+      try(AutoCloseableLock readLock = readLock()) {
+        return segments.isEmpty();
+      }
+    }
+
+    int size() {
+      try(AutoCloseableLock readLock = readLock()) {
+        return segments.size();
+      }
+    }
+
+    long countCached() {
+      try(AutoCloseableLock readLock = readLock()) {
+        return segments.stream().filter(LogSegment::hasCache).count();
+      }
+    }
+
+    LogSegment getLast() {
+      try(AutoCloseableLock readLock = readLock()) {
+        return segments.isEmpty()? null: segments.get(segments.size() - 1);
+      }
+    }
+
+    LogSegment get(int i) {
+      try(AutoCloseableLock readLock = readLock()) {
+        return segments.get(i);
+      }
+    }
+
+    int binarySearch(long index) {
+      try(AutoCloseableLock readLock = readLock()) {
+        return Collections.binarySearch(segments, index);
+      }
+    }
+
+    LogSegment search(long index) {
+      try(AutoCloseableLock readLock = readLock()) {
+        final int i = Collections.binarySearch(segments, index);
+        return i < 0? null: segments.get(i);
+      }
+    }
+
+    TermIndex[] getTermIndex(long startIndex, long realEnd, LogSegment 
openSegment) {
+      final TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - 
startIndex)];
+      final int searchIndex;
+      long index = startIndex;
+
+      try(AutoCloseableLock readLock = readLock()) {
+        searchIndex = Collections.binarySearch(segments, startIndex);
+        if (searchIndex >= 0) {
+          for(int i = searchIndex; i < segments.size() && index < realEnd; 
i++) {
+            final LogSegment s = segments.get(i);
+            final int numberFromSegment = Math.toIntExact(Math.min(realEnd - 
index, s.getEndIndex() - index + 1));
+            getFromSegment(s, index, entries, Math.toIntExact(index - 
startIndex), numberFromSegment);
+            index += numberFromSegment;
+          }
+        }
+      }
+
+      // openSegment is read outside the lock.
+      if (searchIndex < 0) {
+        getFromSegment(openSegment, startIndex, entries, 0, entries.length);
+      } else if (index < realEnd) {
+        getFromSegment(openSegment, index, entries,
+            Math.toIntExact(index - startIndex), Math.toIntExact(realEnd - 
index));
+      }
+      return entries;
+    }
+
+    boolean add(LogSegment logSegment) {
+      try(AutoCloseableLock writeLock = writeLock()) {
+        return segments.add(logSegment);
+      }
+    }
+
+    void clear() {
+      try(AutoCloseableLock writeLock = writeLock()) {
+        segments.forEach(LogSegment::clear);
+        segments.clear();
+      }
+    }
+
+    TruncationSegments truncate(long index, LogSegment openSegment, Runnable 
clearOpenSegment) {
+      try(AutoCloseableLock writeLock = writeLock()) {
+        final int segmentIndex = binarySearch(index);
+        if (segmentIndex == -segments.size() - 1) {
+          if (openSegment != null && openSegment.getEndIndex() >= index) {
+            final long oldEnd = openSegment.getEndIndex();
+            if (index == openSegment.getStartIndex()) {
+              // the open segment should be deleted
+              final SegmentFileInfo deleted = deleteOpenSegment(openSegment, 
clearOpenSegment);
+              return new TruncationSegments(null, 
Collections.singletonList(deleted));
+            } else {
+              openSegment.truncate(index);
+              Preconditions.assertTrue(!openSegment.isOpen());
+              final SegmentFileInfo info = new 
SegmentFileInfo(openSegment.getStartIndex(),
+                  oldEnd, true, openSegment.getTotalSize(), 
openSegment.getEndIndex());
+              segments.add(openSegment);
+              clearOpenSegment.run();
+              return new TruncationSegments(info, Collections.emptyList());
+            }
+          }
+        } else if (segmentIndex >= 0) {
+          final LogSegment ts = segments.get(segmentIndex);
+          final long oldEnd = ts.getEndIndex();
+          final List<SegmentFileInfo> list = new ArrayList<>();
+          ts.truncate(index);
+          final int size = segments.size();
+          for(int i = size - 1;
+              i >= (ts.numOfEntries() == 0? segmentIndex: segmentIndex + 1);
+              i--) {
+            LogSegment s = segments.remove(i);
+            final long endOfS = i == segmentIndex? oldEnd: s.getEndIndex();
+            s.clear();
+            list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0, 
s.getEndIndex()));
+          }
+          if (openSegment != null) {
+            list.add(deleteOpenSegment(openSegment, clearOpenSegment));
+          }
+          SegmentFileInfo t = ts.numOfEntries() == 0? null:
+              new SegmentFileInfo(ts.getStartIndex(), oldEnd, false, 
ts.getTotalSize(), ts.getEndIndex());
+          return new TruncationSegments(t, list);
+        }
+        return null;
+      }
+    }
+
+    static SegmentFileInfo deleteOpenSegment(LogSegment openSegment, Runnable 
clearOpenSegment) {
+      final long oldEnd = openSegment.getEndIndex();
+      openSegment.clear();
+      final SegmentFileInfo info = new 
SegmentFileInfo(openSegment.getStartIndex(), oldEnd, true,
+          0, openSegment.getEndIndex());
+      clearOpenSegment.run();
+      return info;
+    }
+  }
+
   private final String name;
   private volatile LogSegment openSegment;
-  private final List<LogSegment> closedSegments;
+  private final LogSegmentList closedSegments = new LogSegmentList();
   private final RaftStorage storage;
 
   private final int maxCachedSegments;
@@ -98,7 +251,6 @@ class RaftLogCache {
     this.name = selfId + "-" + getClass().getSimpleName();
     this.storage = storage;
     maxCachedSegments = 
RaftServerConfigKeys.Log.maxCachedSegmentNum(properties);
-    closedSegments = new ArrayList<>();
   }
 
   int getMaxCachedSegments() {
@@ -115,11 +267,11 @@ class RaftLogCache {
   }
 
   long getCachedSegmentNum() {
-    return closedSegments.stream().filter(LogSegment::hasCache).count();
+    return closedSegments.countCached();
   }
 
   boolean shouldEvict() {
-    return getCachedSegmentNum() > maxCachedSegments;
+    return closedSegments.countCached() > maxCachedSegments;
   }
 
   void evictCache(long[] followerIndices, long flushedIndex,
@@ -131,13 +283,9 @@ class RaftLogCache {
     }
   }
 
-  private LogSegment getLastClosedSegment() {
-    return closedSegments.isEmpty() ?
-        null : closedSegments.get(closedSegments.size() - 1);
-  }
 
   private void validateAdding(LogSegment segment) {
-    final LogSegment lastClosed = getLastClosedSegment();
+    final LogSegment lastClosed = closedSegments.getLast();
     if (lastClosed != null) {
       Preconditions.assertTrue(!lastClosed.isOpen());
       Preconditions.assertTrue(lastClosed.getEndIndex() + 1 == 
segment.getStartIndex());
@@ -192,8 +340,7 @@ class RaftLogCache {
     if (openSegment != null && index >= openSegment.getStartIndex()) {
       return openSegment;
     } else {
-      int segmentIndex = Collections.binarySearch(closedSegments, index);
-      return segmentIndex < 0 ? null : closedSegments.get(segmentIndex);
+      return closedSegments.search(index);
     }
   }
 
@@ -219,31 +366,10 @@ class RaftLogCache {
     if (startIndex >= realEnd) {
       return TermIndex.EMPTY_TERMINDEX_ARRAY;
     }
-
-    TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)];
-    int segmentIndex = Collections.binarySearch(closedSegments, startIndex);
-    if (segmentIndex < 0) {
-      getFromSegment(openSegment, startIndex, entries, 0, entries.length);
-    } else {
-      long index = startIndex;
-      for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; 
i++) {
-        LogSegment s = closedSegments.get(i);
-        int numberFromSegment = Math.toIntExact(
-            Math.min(realEnd - index, s.getEndIndex() - index + 1));
-        getFromSegment(s, index, entries,
-            Math.toIntExact(index - startIndex), numberFromSegment);
-        index += numberFromSegment;
-      }
-      if (index < realEnd) {
-        getFromSegment(openSegment, index, entries,
-            Math.toIntExact(index - startIndex),
-            Math.toIntExact(realEnd - index));
-      }
-    }
-    return entries;
+    return closedSegments.getTermIndex(startIndex, realEnd, openSegment);
   }
 
-  private void getFromSegment(LogSegment segment, long startIndex,
+  private static void getFromSegment(LogSegment segment, long startIndex,
       TermIndex[] entries, int offset, int size) {
     long endIndex = segment.getEndIndex();
     endIndex = Math.min(endIndex, startIndex + size - 1);
@@ -289,68 +415,68 @@ class RaftLogCache {
     openSegment.appendToOpenSegment(entry);
   }
 
-  private SegmentFileInfo deleteOpenSegment() {
-    final long oldEnd = openSegment.getEndIndex();
-    openSegment.clear();
-    SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
-        oldEnd, true, 0, openSegment.getEndIndex());
-    clearOpenSegment();
-    return info;
-  }
-
   /**
    * truncate log entries starting from the given index (inclusive)
    */
   TruncationSegments truncate(long index) {
-    int segmentIndex = Collections.binarySearch(closedSegments, index);
-    if (segmentIndex == -closedSegments.size() - 1) {
-      if (openSegment != null && openSegment.getEndIndex() >= index) {
-        final long oldEnd = openSegment.getEndIndex();
-        if (index == openSegment.getStartIndex()) {
-          // the open segment should be deleted
-          return new TruncationSegments(null,
-              Collections.singletonList(deleteOpenSegment()));
-        } else {
-          openSegment.truncate(index);
-          Preconditions.assertTrue(!openSegment.isOpen());
-          SegmentFileInfo info = new 
SegmentFileInfo(openSegment.getStartIndex(),
-              oldEnd, true, openSegment.getTotalSize(),
-              openSegment.getEndIndex());
-          closedSegments.add(openSegment);
-          clearOpenSegment();
-          return new TruncationSegments(info, Collections.emptyList());
-        }
-      }
-    } else if (segmentIndex >= 0) {
-      LogSegment ts = closedSegments.get(segmentIndex);
-      final long oldEnd = ts.getEndIndex();
-      List<SegmentFileInfo> list = new ArrayList<>();
-      ts.truncate(index);
-      final int size = closedSegments.size();
-      for (int i = size - 1;
-           i >= (ts.numOfEntries() == 0 ? segmentIndex : segmentIndex + 1);
-           i-- ) {
-        LogSegment s = closedSegments.remove(i);
-        final long endOfS = i == segmentIndex ? oldEnd : s.getEndIndex();
-        s.clear();
-        list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0,
-            s.getEndIndex()));
-      }
-      if (openSegment != null) {
-        list.add(deleteOpenSegment());
-      }
-      SegmentFileInfo t = ts.numOfEntries() == 0 ? null :
-          new SegmentFileInfo(ts.getStartIndex(), oldEnd, false,
-              ts.getTotalSize(), ts.getEndIndex());
-      return new TruncationSegments(t, list);
-    }
-    return null;
+    return closedSegments.truncate(index, openSegment, this::clearOpenSegment);
   }
 
   Iterator<TermIndex> iterator(long startIndex) {
     return new EntryIterator(startIndex);
   }
 
+  static class TruncateIndices {
+    final int arrayIndex;
+    final long truncateIndex;
+
+    TruncateIndices(int arrayIndex, long truncateIndex) {
+      this.arrayIndex = arrayIndex;
+      this.truncateIndex = truncateIndex;
+    }
+
+    int getArrayIndex() {
+      return arrayIndex;
+    }
+
+    long getTruncateIndex() {
+      return truncateIndex;
+    }
+  }
+
+  TruncateIndices computeTruncateIndices(Consumer<TermIndex> 
failClientRequest, LogEntryProto... entries) {
+    int arrayIndex = 0;
+    long truncateIndex = -1;
+
+    try(AutoCloseableLock readLock = closedSegments.readLock()) {
+      final Iterator<TermIndex> i = iterator(entries[0].getIndex());
+      for(; i.hasNext() && arrayIndex < entries.length; arrayIndex++) {
+        final TermIndex storedEntry = i.next();
+        Preconditions.assertTrue(storedEntry.getIndex() == 
entries[arrayIndex].getIndex(),
+            "The stored entry's index %s is not consistent with the received 
entries[%s]'s index %s",
+            storedEntry.getIndex(), arrayIndex, 
entries[arrayIndex].getIndex());
+
+        if (storedEntry.getTerm() != entries[arrayIndex].getTerm()) {
+          // we should truncate from the storedEntry's arrayIndex
+          truncateIndex = storedEntry.getIndex();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("{}: truncate to {}, arrayIndex={}, ti={}, 
storedEntry={}, entries={}",
+                name, truncateIndex, arrayIndex,
+                ServerProtoUtils.toTermIndex(entries[arrayIndex]), storedEntry,
+                ServerProtoUtils.toString(entries));
+          }
+
+          // fail all requests starting at truncateIndex
+          failClientRequest.accept(storedEntry);
+          for(; i.hasNext(); ) {
+            failClientRequest.accept(i.next());
+          }
+        }
+      }
+    }
+    return new TruncateIndices(arrayIndex, truncateIndex);
+  }
+
   private class EntryIterator implements Iterator<TermIndex> {
     private long nextIndex;
     private LogSegment currentSegment;
@@ -358,7 +484,7 @@ class RaftLogCache {
 
     EntryIterator(long start) {
       this.nextIndex = start;
-      segmentIndex = Collections.binarySearch(closedSegments, nextIndex);
+      segmentIndex = closedSegments.binarySearch(nextIndex);
       if (segmentIndex >= 0) {
         currentSegment = closedSegments.get(segmentIndex);
       } else {
@@ -412,7 +538,6 @@ class RaftLogCache {
       openSegment.clear();
       clearOpenSegment();
     }
-    closedSegments.forEach(LogSegment::clear);
     closedSegments.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b22e093/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
index 2242934..5127e74 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
@@ -210,9 +210,7 @@ public class RaftStorageDirectory {
         }
       }
     }
-    Collections.sort(list,
-        (o1, o2) -> o1.startIndex == o2.startIndex ?
-            0 : (o1.startIndex < o2.startIndex ? -1 : 1));
+    list.sort(Comparator.comparingLong(o -> o.startIndex));
     return list;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b22e093/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 7fb4200..b692957 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -25,6 +25,7 @@ import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.LogSegment.LogRecord;
 import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
+import org.apache.ratis.server.storage.RaftLogCache.TruncateIndices;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.StateMachine;
@@ -36,7 +37,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
@@ -322,51 +322,25 @@ public class SegmentedRaftLog extends RaftLog {
     }
   }
 
+  private void failClientRequest(TermIndex ti) {
+    try {
+      final LogEntryProto entry = get(ti.getIndex());
+      server.failClientRequest(entry);
+    } catch(RaftLogIOException e) {
+      LOG.error(getName() + ": Failed to read log " + ti, e);
+    }
+  }
+
   @Override
   public List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries) {
     checkLogState();
     if (entries == null || entries.length == 0) {
       return Collections.emptyList();
     }
-
     try(AutoCloseableLock writeLock = writeLock()) {
-      Iterator<TermIndex> iter = cache.iterator(entries[0].getIndex());
-      int index = 0;
-      long truncateIndex = -1;
-      for (; iter.hasNext() && index < entries.length; index++) {
-        TermIndex storedEntry = iter.next();
-        Preconditions.assertTrue(
-            storedEntry.getIndex() == entries[index].getIndex(),
-            "The stored entry's index %s is not consistent with" +
-                " the received entries[%s]'s index %s", storedEntry.getIndex(),
-            index, entries[index].getIndex());
-
-        if (storedEntry.getTerm() != entries[index].getTerm()) {
-          // we should truncate from the storedEntry's index
-          truncateIndex = storedEntry.getIndex();
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("{}: truncate to {}, index={}, ti={}, storedEntry={}, 
entries={}",
-                server.getId(), truncateIndex, index,
-                ServerProtoUtils.toTermIndex(entries[index]), storedEntry,
-                ServerProtoUtils.toString(entries));
-          }
-          while (true) {
-            try {
-              final LogEntryProto entry = get(storedEntry.getIndex());
-              server.failClientRequest(entry);
-            } catch (RaftLogIOException e) {
-              LOG.error("Failed to read log " + storedEntry, e);
-            }
-
-            if (iter.hasNext()) {
-              storedEntry = iter.next();
-            } else {
-              break;
-            }
-          }
-          break;
-        }
-      }
+      final TruncateIndices ti = 
cache.computeTruncateIndices(this::failClientRequest, entries);
+      final long truncateIndex = ti.getTruncateIndex();
+      final int index = ti.getArrayIndex();
 
       final List<CompletableFuture<Long>> futures;
       if (truncateIndex != -1) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b22e093/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
index 1cd41a5..b34d58f 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
@@ -21,6 +21,7 @@ import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
@@ -28,8 +29,8 @@ import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.impl.ServerState;
 import 
org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
+import org.apache.ratis.server.storage.RaftLogCache.LogSegmentList;
 import org.apache.ratis.server.storage.TestSegmentedRaftLog.SegmentRange;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.SizeInBytes;
@@ -46,9 +47,9 @@ import java.util.concurrent.CompletableFuture;
 public class TestCacheEviction extends BaseTest {
   private static final CacheInvalidationPolicy policy = new 
CacheInvalidationPolicyDefault();
 
-  private List<LogSegment> prepareSegments(int numSegments, boolean[] cached, 
long start, long size) {
+  static LogSegmentList prepareSegments(int numSegments, boolean[] cached, 
long start, long size) {
     Assert.assertEquals(numSegments, cached.length);
-    List<LogSegment> segments = new ArrayList<>(numSegments);
+    final LogSegmentList segments = new LogSegmentList();
     for (int i = 0; i < numSegments; i++) {
       LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1);
       if (cached[i]) {
@@ -64,7 +65,7 @@ public class TestCacheEviction extends BaseTest {
   @Test
   public void testBasicEviction() throws Exception {
     final int maxCached = 5;
-    List<LogSegment> segments = prepareSegments(5,
+    final LogSegmentList segments = prepareSegments(5,
         new boolean[]{true, true, true, true, true}, 0, 10);
 
     // case 1, make sure we do not evict cache for segments behind local 
flushed index
@@ -103,7 +104,7 @@ public class TestCacheEviction extends BaseTest {
   @Test
   public void testEvictionWithFollowerIndices() throws Exception {
     final int maxCached = 6;
-    List<LogSegment> segments = prepareSegments(6,
+    final LogSegmentList segments = prepareSegments(6,
         new boolean[]{true, true, true, true, true, true}, 0, 10);
 
     // case 1, no matter where the followers are, we do not evict segments 
behind local

Reply via email to