This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c454d7839 RATIS-2228. Refactor the offered map in
LogAppenderBase.nextAppendEntriesRequest (#1201)
c454d7839 is described below
commit c454d78393588d634aa50437ea8a0e7d6de80976
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jan 3 07:11:05 2025 -0800
RATIS-2228. Refactor the offered map in
LogAppenderBase.nextAppendEntriesRequest (#1201)
---
.../ratis/server/leader/LogAppenderBase.java | 190 ++++++++++++++-------
.../ratis/server/raftlog/segmented/LogSegment.java | 21 +--
.../server/raftlog/segmented/SegmentedRaftLog.java | 5 +-
.../server/raftlog/segmented/TestLogSegment.java | 3 +-
4 files changed, 142 insertions(+), 77 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 9d1861fb4..22cd16860 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -37,14 +37,13 @@ import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongUnaryOperator;
@@ -52,12 +51,90 @@ import java.util.function.LongUnaryOperator;
* An abstract implementation of {@link LogAppender}.
*/
public abstract class LogAppenderBase implements LogAppender {
+ /** For buffering log entries to create an {@link EntryList}. */
+ private static class EntryBuffer {
+ /** A queue for limiting the byte size, number of elements and poll time.
*/
+ private final DataQueue<EntryWithData> queue;
+ /** A map for releasing {@link ReferenceCountedObject}s. */
+ private final Map<Long, ReferenceCountedObject<EntryWithData>> references
= new HashMap<>();
+
+ EntryBuffer(Object name, RaftProperties properties) {
+ final SizeInBytes bufferByteLimit =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
+ final int bufferElementLimit =
RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
+ this.queue = new DataQueue<>(name, bufferByteLimit, bufferElementLimit,
EntryWithData::getSerializedSize);
+ }
+
+ boolean putNew(long index, ReferenceCountedObject<EntryWithData> retained)
{
+ if (!queue.offer(retained.get())) {
+ retained.release();
+ return false;
+ }
+ final ReferenceCountedObject<EntryWithData> previous =
references.put(index, retained);
+ Preconditions.assertNull(previous, () -> "previous with index " + index);
+ return true;
+ }
+
+ void releaseAllAndClear() {
+ for (ReferenceCountedObject<EntryWithData> ref : references.values()) {
+ ref.release();
+ }
+ references.clear();
+ queue.clear();
+ }
+
+ EntryList pollList(long heartbeatWaitTimeMs) throws RaftLogIOException {
+ final List<LogEntryProto> protos;
+ try {
+ protos = queue.pollList(heartbeatWaitTimeMs, EntryWithData::getEntry,
null);
+ } catch (Exception e) {
+ releaseAllAndClear();
+ throw e;
+ } finally {
+ for (EntryWithData entry : queue) {
+ // Remove and release remaining entries.
+ final ReferenceCountedObject<EntryWithData> removed =
references.remove(entry.getIndex());
+ Objects.requireNonNull(removed, "removed == null");
+ removed.release();
+ }
+ queue.clear();
+ }
+ return new EntryList(protos, references);
+ }
+ }
+
+ /** Storing log entries and their references. */
+ private static class EntryList {
+ private final List<LogEntryProto> protos;
+ private final Collection<ReferenceCountedObject<EntryWithData>> references;
+
+ EntryList(List<LogEntryProto> protos, Map<Long,
ReferenceCountedObject<EntryWithData>> references) {
+ Preconditions.assertSame(references.size(), protos.size(), "#entries");
+ this.protos = Collections.unmodifiableList(protos);
+ this.references =
Collections.unmodifiableCollection(references.values());
+ }
+
+ List<LogEntryProto> getProtos() {
+ return protos;
+ }
+
+ void retain() {
+ for (ReferenceCountedObject<EntryWithData> ref : references) {
+ ref.retain();
+ }
+ }
+
+ void release() {
+ for (ReferenceCountedObject<EntryWithData> ref : references) {
+ ref.release();
+ }
+ }
+ }
+
private final String name;
private final RaftServer.Division server;
private final LeaderState leaderState;
private final FollowerInfo follower;
- private final DataQueue<EntryWithData> buffer;
private final int snapshotChunkMaxSize;
private final LogAppenderDaemon daemon;
@@ -75,9 +152,6 @@ public abstract class LogAppenderBase implements LogAppender
{
final RaftProperties properties = server.getRaftServer().getProperties();
this.snapshotChunkMaxSize =
RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
- final SizeInBytes bufferByteLimit =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
- final int bufferElementLimit =
RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
- this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit,
EntryWithData::getSerializedSize);
this.daemon = new LogAppenderDaemon(this);
this.eventAwaitForSignal = new AwaitForSignal(name);
@@ -210,13 +284,13 @@ public abstract class LogAppenderBase implements
LogAppender {
final long n = oldNextIndex <= 0L ? oldNextIndex : Math.min(oldNextIndex
- 1, newNextIndex);
if (m > n) {
if (m > newNextIndex) {
- LOG.info("Set nextIndex to matchIndex + 1 (= " + m + ")");
+ LOG.info("{}: Set nextIndex to matchIndex + 1 (= {})", name, m);
}
return m;
} else if (oldNextIndex <= 0L) {
return oldNextIndex; // no change.
} else {
- LOG.info("Decrease nextIndex to " + n);
+ LOG.info("{}: Decrease nextIndex to {}", name, n);
return n;
}
};
@@ -227,18 +301,18 @@ public abstract class LogAppenderBase implements
LogAppender {
throw new UnsupportedOperationException("Use nextAppendEntriesRequest(" +
callId + ", " + heartbeat +") instead.");
}
-/**
- * Create a {@link AppendEntriesRequestProto} object using the {@link
FollowerInfo} of this {@link LogAppender}.
- * The {@link AppendEntriesRequestProto} object may contain zero or more log
entries.
- * When there is zero log entries, the {@link AppendEntriesRequestProto}
object is a heartbeat.
- *
- * @param callId The call id of the returned request.
- * @param heartbeat the returned request must be a heartbeat.
- *
- * @return a retained reference of {@link AppendEntriesRequestProto} object.
- * Since the returned reference is retained, the caller must call
{@link ReferenceCountedObject#release()}}
- * after use.
- */
+ /**
+ * Create a {@link AppendEntriesRequestProto} object using the {@link
FollowerInfo} of this {@link LogAppender}.
+ * The {@link AppendEntriesRequestProto} object may contain zero or more log
entries.
+ * When there is zero log entries, the {@link AppendEntriesRequestProto}
object is a heartbeat.
+ *
+ * @param callId The call id of the returned request.
+ * @param heartbeat the returned request must be a heartbeat.
+ *
+ * @return a retained reference of {@link AppendEntriesRequestProto} object.
+ * Since the returned reference is retained,
+ * the caller must call {@link ReferenceCountedObject#release()}}
after use.
+ */
protected ReferenceCountedObject<AppendEntriesRequestProto>
nextAppendEntriesRequest(long callId, boolean heartbeat)
throws RaftLogIOException {
final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
@@ -253,56 +327,23 @@ public abstract class LogAppenderBase implements
LogAppender {
return ref;
}
- Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " +
buffer.getNumElements() + " elements.");
-
final long snapshotIndex = follower.getSnapshotIndex();
- final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
- final long halfMs = heartbeatWaitTimeMs/2;
- final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new
HashMap<>();
- for (long next = followerNext; leaderNext > next &&
getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
- final ReferenceCountedObject<EntryWithData> entryWithData;
- try {
- entryWithData = getRaftLog().retainEntryWithData(next);
- if (!buffer.offer(entryWithData.get())) {
- entryWithData.release();
- break;
- }
- offered.put(next, entryWithData);
- } catch (Exception e){
- for (ReferenceCountedObject<EntryWithData> ref : offered.values()) {
- ref.release();
- }
- offered.clear();
- throw e;
- }
- }
- if (buffer.isEmpty()) {
+ final EntryBuffer entryBuffer = readLogEntries(followerNext,
heartbeatWaitTimeMs);
+ if (entryBuffer == null) {
return null;
}
- final List<LogEntryProto> protos;
- try {
- protos = buffer.pollList(getHeartbeatWaitTimeMs(),
EntryWithData::getEntry,
- (entry, time, exception) -> LOG.warn("Failed to get {} in {}",
- entry, time.toString(TimeUnit.MILLISECONDS, 3), exception));
- } catch (RaftLogIOException e) {
- for (ReferenceCountedObject<EntryWithData> ref : offered.values()) {
- ref.release();
- }
- offered.clear();
- throw e;
- } finally {
- for (EntryWithData entry : buffer) {
- // Release remaining entries.
-
Optional.ofNullable(offered.remove(entry.getIndex())).ifPresent(ReferenceCountedObject::release);
- }
- buffer.clear();
- }
+ final EntryList entryList = entryBuffer.pollList(heartbeatWaitTimeMs);
+ final List<LogEntryProto> protos = entryList.getProtos();
assertProtos(protos, followerNext, previous, snapshotIndex);
AppendEntriesRequestProto appendEntriesProto =
leaderState.newAppendEntriesRequestProto(follower, protos, previous,
callId);
- return ReferenceCountedObject.delegateFrom(offered.values(),
appendEntriesProto);
+ final ReferenceCountedObject<AppendEntriesRequestProto> ref =
ReferenceCountedObject.wrap(
+ appendEntriesProto, entryList::retain, entryList::release);
+ ref.retain();
+ entryList.release();
+ return ref;
}
private void assertProtos(List<LogEntryProto> protos, long nextIndex,
TermIndex previous, long snapshotIndex) {
@@ -324,6 +365,31 @@ public abstract class LogAppenderBase implements
LogAppender {
}
}
+ private EntryBuffer readLogEntries(long followerNext, long
heartbeatWaitTimeMs) throws RaftLogIOException {
+ final RaftLog raftLog = getRaftLog();
+ final long leaderNext = raftLog.getNextIndex();
+ final long halfMs = heartbeatWaitTimeMs/2;
+ EntryBuffer entryBuffer = null;
+ for (long next = followerNext; leaderNext > next &&
getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
+ final ReferenceCountedObject<EntryWithData> retained;
+ try {
+ retained = raftLog.retainEntryWithData(next);
+ if (entryBuffer == null) {
+ entryBuffer = new EntryBuffer(name,
server.getRaftServer().getProperties());
+ }
+ if (!entryBuffer.putNew(next, retained)) {
+ break;
+ }
+ } catch (Exception e) {
+ if (entryBuffer != null) {
+ entryBuffer.releaseAllAndClear();
+ }
+ throw e;
+ }
+ }
+ return entryBuffer;
+ }
+
@Override
public InstallSnapshotRequestProto
newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() >= 0);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index ef066baac..2549e13e0 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -26,7 +26,6 @@ import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorage;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
@@ -266,15 +265,14 @@ public final class LogSegment {
*
* In the future we can make the cache loader configurable if necessary.
*/
- class LogEntryLoader extends CacheLoader<LogRecord,
ReferenceCountedObject<LogEntryProto>> {
+ class LogEntryLoader {
private final SegmentedRaftLogMetrics raftLogMetrics;
LogEntryLoader(SegmentedRaftLogMetrics raftLogMetrics) {
this.raftLogMetrics = raftLogMetrics;
}
- @Override
- public ReferenceCountedObject<LogEntryProto> load(LogRecord key) throws
IOException {
+ ReferenceCountedObject<LogEntryProto> load(TermIndex key) throws
IOException {
final File file = getFile();
// note the loading should not exceed the endIndex: it is possible that
// the on-disk log file should be truncated but has not been done yet.
@@ -285,17 +283,16 @@ public final class LogSegment {
try {
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
- if (ti.equals(key.getTermIndex())) {
+ if (ti.equals(key)) {
+ entryRef.retain();
toReturn.set(entryRef);
- } else {
- entryRef.release();
}
- } catch (Exception e) {
+ } finally {
entryRef.release();
}
});
loadingTimes.incrementAndGet();
- return Objects.requireNonNull(toReturn.get());
+ return Objects.requireNonNull(toReturn.get(), () -> "toReturn == null
for " + key);
}
}
@@ -492,8 +489,8 @@ public final class LogSegment {
/**
* Acquire LogSegment's monitor so that there is no concurrent loading.
*/
- synchronized ReferenceCountedObject<LogEntryProto> loadCache(LogRecord
record) throws RaftLogIOException {
- ReferenceCountedObject<LogEntryProto> entry =
entryCache.get(record.getTermIndex());
+ synchronized ReferenceCountedObject<LogEntryProto> loadCache(TermIndex ti)
throws RaftLogIOException {
+ final ReferenceCountedObject<LogEntryProto> entry = entryCache.get(ti);
if (entry != null) {
try {
entry.retain();
@@ -504,7 +501,7 @@ public final class LogSegment {
}
}
try {
- return cacheLoader.load(record);
+ return cacheLoader.load(ti);
} catch (Exception e) {
throw new RaftLogIOException(e);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 01143c753..82115ff14 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -304,7 +304,8 @@ public final class SegmentedRaftLog extends RaftLogBase {
if (record == null) {
return null;
}
- final ReferenceCountedObject<LogEntryProto> entry =
segment.getEntryFromCache(record.getTermIndex());
+ final TermIndex ti = record.getTermIndex();
+ final ReferenceCountedObject<LogEntryProto> entry =
segment.getEntryFromCache(ti);
if (entry != null) {
try {
entry.retain();
@@ -319,7 +320,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
// the entry is not in the segment's cache. Load the cache without holding
the lock.
getRaftLogMetrics().onRaftLogCacheMiss();
cacheEviction.signal();
- return segment.loadCache(record);
+ return segment.loadCache(ti);
}
@Override
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index dd4f075db..5e822de44 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -139,6 +139,7 @@ public class TestLogSegment extends BaseTest {
long offset = SegmentedRaftLogFormat.getHeaderLength();
for (long i = start; i <= end; i++) {
LogSegment.LogRecord record = segment.getLogRecord(i);
+ Assertions.assertNotNull(record);
final TermIndex ti = record.getTermIndex();
Assertions.assertEquals(i, ti.getIndex());
Assertions.assertEquals(term, ti.getTerm());
@@ -146,7 +147,7 @@ public class TestLogSegment extends BaseTest {
ReferenceCountedObject<LogEntryProto> entry =
segment.getEntryFromCache(ti);
if (entry == null) {
- entry = segment.loadCache(record);
+ entry = segment.loadCache(ti);
}
offset += getEntrySize(entry.get(),
Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}