This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c454d7839 RATIS-2228. Refactor the offered map in 
LogAppenderBase.nextAppendEntriesRequest (#1201)
c454d7839 is described below

commit c454d78393588d634aa50437ea8a0e7d6de80976
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jan 3 07:11:05 2025 -0800

    RATIS-2228. Refactor the offered map in 
LogAppenderBase.nextAppendEntriesRequest (#1201)
---
 .../ratis/server/leader/LogAppenderBase.java       | 190 ++++++++++++++-------
 .../ratis/server/raftlog/segmented/LogSegment.java |  21 +--
 .../server/raftlog/segmented/SegmentedRaftLog.java |   5 +-
 .../server/raftlog/segmented/TestLogSegment.java   |   3 +-
 4 files changed, 142 insertions(+), 77 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 9d1861fb4..22cd16860 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -37,14 +37,13 @@ import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongUnaryOperator;
 
@@ -52,12 +51,90 @@ import java.util.function.LongUnaryOperator;
  * An abstract implementation of {@link LogAppender}.
  */
 public abstract class LogAppenderBase implements LogAppender {
+  /** For buffering log entries to create an {@link EntryList}. */
+  private static class EntryBuffer {
+    /** A queue for limiting the byte size, number of elements and poll time. 
*/
+    private final DataQueue<EntryWithData> queue;
+    /** A map for releasing {@link ReferenceCountedObject}s. */
+    private final Map<Long, ReferenceCountedObject<EntryWithData>> references 
= new HashMap<>();
+
+    EntryBuffer(Object name, RaftProperties properties) {
+      final SizeInBytes bufferByteLimit = 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
+      final int bufferElementLimit = 
RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
+      this.queue = new DataQueue<>(name, bufferByteLimit, bufferElementLimit, 
EntryWithData::getSerializedSize);
+    }
+
+    boolean putNew(long index, ReferenceCountedObject<EntryWithData> retained) 
{
+      if (!queue.offer(retained.get())) {
+        retained.release();
+        return false;
+      }
+      final ReferenceCountedObject<EntryWithData> previous = 
references.put(index, retained);
+      Preconditions.assertNull(previous, () -> "previous with index " + index);
+      return true;
+    }
+
+    void releaseAllAndClear() {
+      for (ReferenceCountedObject<EntryWithData> ref : references.values()) {
+        ref.release();
+      }
+      references.clear();
+      queue.clear();
+    }
+
+    EntryList pollList(long heartbeatWaitTimeMs) throws RaftLogIOException {
+      final List<LogEntryProto> protos;
+      try {
+        protos = queue.pollList(heartbeatWaitTimeMs, EntryWithData::getEntry, 
null);
+      } catch (Exception e) {
+        releaseAllAndClear();
+        throw e;
+      } finally {
+        for (EntryWithData entry : queue) {
+          // Remove and release remaining entries.
+          final ReferenceCountedObject<EntryWithData> removed = 
references.remove(entry.getIndex());
+          Objects.requireNonNull(removed, "removed == null");
+          removed.release();
+        }
+        queue.clear();
+      }
+      return new EntryList(protos, references);
+    }
+  }
+
+  /** Storing log entries and their references. */
+  private static class EntryList {
+    private final List<LogEntryProto> protos;
+    private final Collection<ReferenceCountedObject<EntryWithData>> references;
+
+    EntryList(List<LogEntryProto> protos, Map<Long, 
ReferenceCountedObject<EntryWithData>> references) {
+      Preconditions.assertSame(references.size(), protos.size(), "#entries");
+      this.protos = Collections.unmodifiableList(protos);
+      this.references = 
Collections.unmodifiableCollection(references.values());
+    }
+
+    List<LogEntryProto> getProtos() {
+      return protos;
+    }
+
+    void retain() {
+      for (ReferenceCountedObject<EntryWithData> ref : references) {
+        ref.retain();
+      }
+    }
+
+    void release() {
+      for (ReferenceCountedObject<EntryWithData> ref : references) {
+        ref.release();
+      }
+    }
+  }
+
   private final String name;
   private final RaftServer.Division server;
   private final LeaderState leaderState;
   private final FollowerInfo follower;
 
-  private final DataQueue<EntryWithData> buffer;
   private final int snapshotChunkMaxSize;
 
   private final LogAppenderDaemon daemon;
@@ -75,9 +152,6 @@ public abstract class LogAppenderBase implements LogAppender 
{
     final RaftProperties properties = server.getRaftServer().getProperties();
     this.snapshotChunkMaxSize = 
RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
 
-    final SizeInBytes bufferByteLimit = 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
-    final int bufferElementLimit = 
RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
-    this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, 
EntryWithData::getSerializedSize);
     this.daemon = new LogAppenderDaemon(this);
     this.eventAwaitForSignal = new AwaitForSignal(name);
 
@@ -210,13 +284,13 @@ public abstract class LogAppenderBase implements 
LogAppender {
       final long n = oldNextIndex <= 0L ? oldNextIndex : Math.min(oldNextIndex 
- 1, newNextIndex);
       if (m > n) {
         if (m > newNextIndex) {
-          LOG.info("Set nextIndex to matchIndex + 1 (= " + m + ")");
+          LOG.info("{}: Set nextIndex to matchIndex + 1 (= {})", name, m);
         }
         return m;
       } else if (oldNextIndex <= 0L) {
         return oldNextIndex; // no change.
       } else {
-        LOG.info("Decrease nextIndex to " + n);
+        LOG.info("{}: Decrease nextIndex to {}", name, n);
         return n;
       }
     };
@@ -227,18 +301,18 @@ public abstract class LogAppenderBase implements 
LogAppender {
     throw new UnsupportedOperationException("Use nextAppendEntriesRequest(" + 
callId + ", " + heartbeat +") instead.");
   }
 
-/**
- * Create a {@link AppendEntriesRequestProto} object using the {@link 
FollowerInfo} of this {@link LogAppender}.
- * The {@link AppendEntriesRequestProto} object may contain zero or more log 
entries.
- * When there is zero log entries, the {@link AppendEntriesRequestProto} 
object is a heartbeat.
- *
- * @param callId The call id of the returned request.
- * @param heartbeat the returned request must be a heartbeat.
- *
- * @return a retained reference of {@link AppendEntriesRequestProto} object.
- *         Since the returned reference is retained, the caller must call 
{@link ReferenceCountedObject#release()}}
- *         after use.
- */
+  /**
+   * Create a {@link AppendEntriesRequestProto} object using the {@link 
FollowerInfo} of this {@link LogAppender}.
+   * The {@link AppendEntriesRequestProto} object may contain zero or more log 
entries.
+   * When there is zero log entries, the {@link AppendEntriesRequestProto} 
object is a heartbeat.
+   *
+   * @param callId The call id of the returned request.
+   * @param heartbeat the returned request must be a heartbeat.
+   *
+   * @return a retained reference of {@link AppendEntriesRequestProto} object.
+   *         Since the returned reference is retained,
+   *         the caller must call {@link ReferenceCountedObject#release()}} 
after use.
+   */
   protected ReferenceCountedObject<AppendEntriesRequestProto> 
nextAppendEntriesRequest(long callId, boolean heartbeat)
       throws RaftLogIOException {
     final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
@@ -253,56 +327,23 @@ public abstract class LogAppenderBase implements 
LogAppender {
       return ref;
     }
 
-    Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + 
buffer.getNumElements() + " elements.");
-
     final long snapshotIndex = follower.getSnapshotIndex();
-    final long leaderNext = getRaftLog().getNextIndex();
     final long followerNext = follower.getNextIndex();
-    final long halfMs = heartbeatWaitTimeMs/2;
-    final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new 
HashMap<>();
-    for (long next = followerNext; leaderNext > next && 
getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
-      final ReferenceCountedObject<EntryWithData> entryWithData;
-      try {
-        entryWithData = getRaftLog().retainEntryWithData(next);
-        if (!buffer.offer(entryWithData.get())) {
-          entryWithData.release();
-          break;
-        }
-        offered.put(next, entryWithData);
-      } catch (Exception e){
-        for (ReferenceCountedObject<EntryWithData> ref : offered.values()) {
-          ref.release();
-        }
-        offered.clear();
-        throw e;
-      }
-    }
-    if (buffer.isEmpty()) {
+    final EntryBuffer entryBuffer = readLogEntries(followerNext, 
heartbeatWaitTimeMs);
+    if (entryBuffer == null) {
       return null;
     }
 
-    final List<LogEntryProto> protos;
-    try {
-      protos = buffer.pollList(getHeartbeatWaitTimeMs(), 
EntryWithData::getEntry,
-          (entry, time, exception) -> LOG.warn("Failed to get {} in {}",
-              entry, time.toString(TimeUnit.MILLISECONDS, 3), exception));
-    } catch (RaftLogIOException e) {
-      for (ReferenceCountedObject<EntryWithData> ref : offered.values()) {
-        ref.release();
-      }
-      offered.clear();
-      throw e;
-    } finally {
-      for (EntryWithData entry : buffer) {
-        // Release remaining entries.
-        
Optional.ofNullable(offered.remove(entry.getIndex())).ifPresent(ReferenceCountedObject::release);
-      }
-      buffer.clear();
-    }
+    final EntryList entryList = entryBuffer.pollList(heartbeatWaitTimeMs);
+    final List<LogEntryProto> protos = entryList.getProtos();
     assertProtos(protos, followerNext, previous, snapshotIndex);
     AppendEntriesRequestProto appendEntriesProto =
         leaderState.newAppendEntriesRequestProto(follower, protos, previous, 
callId);
-    return ReferenceCountedObject.delegateFrom(offered.values(), 
appendEntriesProto);
+    final ReferenceCountedObject<AppendEntriesRequestProto> ref = 
ReferenceCountedObject.wrap(
+        appendEntriesProto, entryList::retain, entryList::release);
+    ref.retain();
+    entryList.release();
+    return ref;
   }
 
   private void assertProtos(List<LogEntryProto> protos, long nextIndex, 
TermIndex previous, long snapshotIndex) {
@@ -324,6 +365,31 @@ public abstract class LogAppenderBase implements 
LogAppender {
     }
   }
 
+  private EntryBuffer readLogEntries(long followerNext, long 
heartbeatWaitTimeMs) throws RaftLogIOException {
+    final RaftLog raftLog = getRaftLog();
+    final long leaderNext = raftLog.getNextIndex();
+    final long halfMs = heartbeatWaitTimeMs/2;
+    EntryBuffer entryBuffer = null;
+    for (long next = followerNext; leaderNext > next && 
getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
+      final ReferenceCountedObject<EntryWithData> retained;
+      try {
+        retained = raftLog.retainEntryWithData(next);
+        if (entryBuffer == null) {
+          entryBuffer = new EntryBuffer(name, 
server.getRaftServer().getProperties());
+        }
+        if (!entryBuffer.putNew(next, retained)) {
+          break;
+        }
+      } catch (Exception e) {
+        if (entryBuffer != null) {
+          entryBuffer.releaseAllAndClear();
+        }
+        throw e;
+      }
+    }
+    return entryBuffer;
+  }
+
   @Override
   public InstallSnapshotRequestProto 
newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
     Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() >= 0);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index ef066baac..2549e13e0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -26,7 +26,6 @@ import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorage;
 import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -266,15 +265,14 @@ public final class LogSegment {
    *
    * In the future we can make the cache loader configurable if necessary.
    */
-  class LogEntryLoader extends CacheLoader<LogRecord, 
ReferenceCountedObject<LogEntryProto>> {
+  class LogEntryLoader {
     private final SegmentedRaftLogMetrics raftLogMetrics;
 
     LogEntryLoader(SegmentedRaftLogMetrics raftLogMetrics) {
       this.raftLogMetrics = raftLogMetrics;
     }
 
-    @Override
-    public ReferenceCountedObject<LogEntryProto> load(LogRecord key) throws 
IOException {
+    ReferenceCountedObject<LogEntryProto> load(TermIndex key) throws 
IOException {
       final File file = getFile();
       // note the loading should not exceed the endIndex: it is possible that
       // the on-disk log file should be truncated but has not been done yet.
@@ -285,17 +283,16 @@ public final class LogSegment {
         try {
           final TermIndex ti = TermIndex.valueOf(entry);
           putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
-          if (ti.equals(key.getTermIndex())) {
+          if (ti.equals(key)) {
+            entryRef.retain();
             toReturn.set(entryRef);
-          } else {
-            entryRef.release();
           }
-        } catch (Exception e) {
+        } finally {
           entryRef.release();
         }
       });
       loadingTimes.incrementAndGet();
-      return Objects.requireNonNull(toReturn.get());
+      return Objects.requireNonNull(toReturn.get(), () -> "toReturn == null 
for " + key);
     }
   }
 
@@ -492,8 +489,8 @@ public final class LogSegment {
   /**
    * Acquire LogSegment's monitor so that there is no concurrent loading.
    */
-  synchronized ReferenceCountedObject<LogEntryProto> loadCache(LogRecord 
record) throws RaftLogIOException {
-    ReferenceCountedObject<LogEntryProto> entry = 
entryCache.get(record.getTermIndex());
+  synchronized ReferenceCountedObject<LogEntryProto> loadCache(TermIndex ti) 
throws RaftLogIOException {
+    final ReferenceCountedObject<LogEntryProto> entry = entryCache.get(ti);
     if (entry != null) {
       try {
         entry.retain();
@@ -504,7 +501,7 @@ public final class LogSegment {
       }
     }
     try {
-      return cacheLoader.load(record);
+      return cacheLoader.load(ti);
     } catch (Exception e) {
       throw new RaftLogIOException(e);
     }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 01143c753..82115ff14 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -304,7 +304,8 @@ public final class SegmentedRaftLog extends RaftLogBase {
     if (record == null) {
       return null;
     }
-    final ReferenceCountedObject<LogEntryProto> entry = 
segment.getEntryFromCache(record.getTermIndex());
+    final TermIndex ti = record.getTermIndex();
+    final ReferenceCountedObject<LogEntryProto> entry = 
segment.getEntryFromCache(ti);
     if (entry != null) {
       try {
         entry.retain();
@@ -319,7 +320,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
     // the entry is not in the segment's cache. Load the cache without holding 
the lock.
     getRaftLogMetrics().onRaftLogCacheMiss();
     cacheEviction.signal();
-    return segment.loadCache(record);
+    return segment.loadCache(ti);
   }
 
   @Override
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index dd4f075db..5e822de44 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -139,6 +139,7 @@ public class TestLogSegment extends BaseTest {
     long offset = SegmentedRaftLogFormat.getHeaderLength();
     for (long i = start; i <= end; i++) {
       LogSegment.LogRecord record = segment.getLogRecord(i);
+      Assertions.assertNotNull(record);
       final TermIndex ti = record.getTermIndex();
       Assertions.assertEquals(i, ti.getIndex());
       Assertions.assertEquals(term, ti.getTerm());
@@ -146,7 +147,7 @@ public class TestLogSegment extends BaseTest {
 
       ReferenceCountedObject<LogEntryProto> entry = 
segment.getEntryFromCache(ti);
       if (entry == null) {
-        entry = segment.loadCache(record);
+        entry = segment.loadCache(ti);
       }
       offset += getEntrySize(entry.get(), 
Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }

Reply via email to