This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 397c2efdd RATIS-2026. LogAppender to consume log entries with
reference count (#1049)
397c2efdd is described below
commit 397c2efdd79ff41ebf57a38bcc3681f2126091ec
Author: Duong Nguyen <[email protected]>
AuthorDate: Tue Mar 26 20:18:16 2024 -0700
RATIS-2026. LogAppender to consume log entries with reference count (#1049)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 29 +++++++++-----
.../apache/ratis/server/leader/LogAppender.java | 2 +
.../org/apache/ratis/server/raftlog/RaftLog.java | 23 +++++++++++
.../ratis/server/leader/LogAppenderBase.java | 46 ++++++++++++++++++----
.../ratis/server/leader/LogAppenderDefault.java | 33 ++++++++++------
.../apache/ratis/server/raftlog/RaftLogBase.java | 13 ++++--
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 16 ++++----
.../server/raftlog/segmented/SegmentedRaftLog.java | 18 ++++-----
8 files changed, 130 insertions(+), 50 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 5f9c94eb2..e23f2826e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -378,30 +378,39 @@ public class GrpcLogAppender extends LogAppenderBase {
}
private void appendLog(boolean heartbeat) throws IOException {
- final AppendEntriesRequestProto pending;
+ ReferenceCountedObject<AppendEntriesRequestProto> pending = null;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
// Prepare and send the append request.
// Note changes on follower's nextIndex and ops on pendingRequests
should always be done under the write-lock
- pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
+ pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
if (pending == null) {
return;
}
- request = new AppendEntriesRequest(pending, getFollowerId(),
grpcServerMetrics);
+ request = new AppendEntriesRequest(pending.get(), getFollowerId(),
grpcServerMetrics);
pendingRequests.put(request);
- increaseNextIndex(pending);
+ increaseNextIndex(pending.get());
if (appendLogRequestObserver == null) {
appendLogRequestObserver = new StreamObservers(
getClient(), new AppendLogResponseHandler(), useSeparateHBChannel,
getWaitTimeMin());
}
+ } catch(Exception e) {
+ if (pending != null) {
+ pending.release();
+ }
+ throw e;
}
- final TimeDuration remaining = getRemainingWaitTime();
- if (remaining.isPositive()) {
- sleep(remaining, heartbeat);
- }
- if (isRunning()) {
- sendRequest(request, pending);
+ try {
+ final TimeDuration remaining = getRemainingWaitTime();
+ if (remaining.isPositive()) {
+ sleep(remaining, heartbeat);
+ }
+ if (isRunning()) {
+ sendRequest(request, pending.get());
+ }
+ } finally {
+ pending.release();
}
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 36331e3ab..78f61300b 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -125,7 +125,9 @@ public interface LogAppender {
* @param heartbeat the returned request must be a heartbeat.
*
* @return a new {@link AppendEntriesRequestProto} object.
+ * @deprecated this is no longer a public API.
*/
+ @Deprecated
AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean
heartbeat) throws RaftLogIOException;
/** @return a new {@link InstallSnapshotRequestProto} object. */
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index e4fbd664e..ca785a4a6 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -79,9 +79,23 @@ public interface RaftLog extends RaftLogSequentialOps,
Closeable {
/**
* @return null if the log entry is not found in this log;
* otherwise, return the {@link EntryWithData} corresponding to the
given index.
+ * @deprecated use {@link #retainEntryWithData(long)}.
*/
+ @Deprecated
EntryWithData getEntryWithData(long index) throws RaftLogIOException;
+ /**
+ * @return null if the log entry is not found in this log;
+ * otherwise, return a retained reference of the {@link
EntryWithData} corresponding to the given index.
+ * Since the returned reference is retained, the caller must call
{@link ReferenceCountedObject#release()}}
+ * after use.
+ */
+ default ReferenceCountedObject<EntryWithData> retainEntryWithData(long
index) throws RaftLogIOException {
+ final ReferenceCountedObject<EntryWithData> wrap =
ReferenceCountedObject.wrap(getEntryWithData(index));
+ wrap.retain();
+ return wrap;
+}
+
/**
* @param startIndex the starting log index (inclusive)
* @param endIndex the ending log index (exclusive)
@@ -172,6 +186,15 @@ public interface RaftLog extends RaftLogSequentialOps,
Closeable {
* containing both the log entry and the state machine data.
*/
interface EntryWithData {
+ /** @return the index of this entry. */
+ default long getIndex() {
+ try {
+ return getEntry(TimeDuration.ONE_MINUTE).getIndex();
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to getIndex", e);
+ }
+ }
+
/** @return the serialized size including both log entry and state machine
data. */
int getSerializedSize();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 958cc6fa8..de221432b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -33,11 +33,14 @@ import org.apache.ratis.util.DataQueue;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -218,16 +221,35 @@ public abstract class LogAppenderBase implements
LogAppender {
};
}
-
@Override
- public AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat)
+ public AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat) {
+ throw new UnsupportedOperationException("Use nextAppendEntriesRequest(" +
callId + ", " + heartbeat +") instead.");
+ }
+
+/**
+ * Create a {@link AppendEntriesRequestProto} object using the {@link
FollowerInfo} of this {@link LogAppender}.
+ * The {@link AppendEntriesRequestProto} object may contain zero or more log
entries.
+ * When there is zero log entries, the {@link AppendEntriesRequestProto}
object is a heartbeat.
+ *
+ * @param callId The call id of the returned request.
+ * @param heartbeat the returned request must be a heartbeat.
+ *
+ * @return a retained reference of {@link AppendEntriesRequestProto} object.
+ * Since the returned reference is retained, the caller must call
{@link ReferenceCountedObject#release()}}
+ * after use.
+ */
+ protected ReferenceCountedObject<AppendEntriesRequestProto>
nextAppendEntriesRequest(long callId, boolean heartbeat)
throws RaftLogIOException {
final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
final TermIndex previous = getPrevious(follower.getNextIndex());
if (heartbeatWaitTimeMs <= 0L || heartbeat) {
// heartbeat
- return leaderState.newAppendEntriesRequestProto(follower,
Collections.emptyList(),
- hasPendingDataRequests()? null : previous, callId);
+ AppendEntriesRequestProto heartbeatRequest =
+ leaderState.newAppendEntriesRequestProto(follower,
Collections.emptyList(),
+ hasPendingDataRequests() ? null : previous, callId);
+ ReferenceCountedObject<AppendEntriesRequestProto> ref =
ReferenceCountedObject.wrap(heartbeatRequest);
+ ref.retain();
+ return ref;
}
Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " +
buffer.getNumElements() + " elements.");
@@ -236,10 +258,14 @@ public abstract class LogAppenderBase implements
LogAppender {
final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
final long halfMs = heartbeatWaitTimeMs/2;
- for (long next = followerNext; leaderNext > next &&
getHeartbeatWaitTimeMs() - halfMs > 0; ) {
- if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
+ final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new
HashMap<>();
+ for (long next = followerNext; leaderNext > next &&
getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
+ final ReferenceCountedObject<EntryWithData> entryWithData =
getRaftLog().retainEntryWithData(next);
+ if (!buffer.offer(entryWithData.get())) {
+ entryWithData.release();
break;
}
+ offered.put(next, entryWithData);
}
if (buffer.isEmpty()) {
return null;
@@ -248,9 +274,15 @@ public abstract class LogAppenderBase implements
LogAppender {
final List<LogEntryProto> protos =
buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
(entry, time, exception) -> LOG.warn("Failed to get " + entry
+ " in " + time.toString(TimeUnit.MILLISECONDS, 3), exception));
+ for (EntryWithData entry : buffer) {
+ // Release remaining entries.
+ offered.remove(entry.getIndex()).release();
+ }
buffer.clear();
assertProtos(protos, followerNext, previous, snapshotIndex);
- return leaderState.newAppendEntriesRequestProto(follower, protos,
previous, callId);
+ AppendEntriesRequestProto appendEntriesProto =
+ leaderState.newAppendEntriesRequestProto(follower, protos, previous,
callId);
+ return ReferenceCountedObject.delegateFrom(offered.values(),
appendEntriesProto);
}
private void assertProtos(List<LogEntryProto> protos, long nextIndex,
TermIndex previous, long snapshotIndex) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 21ef70d4d..432a41992 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.Timestamp;
import java.io.IOException;
@@ -58,11 +59,15 @@ class LogAppenderDefault extends LogAppenderBase {
throws InterruptedException, InterruptedIOException, RaftLogIOException {
int retry = 0;
- AppendEntriesRequestProto request =
newAppendEntriesRequest(CallId.getAndIncrement(), false);
+ ReferenceCountedObject<AppendEntriesRequestProto> request =
nextAppendEntriesRequest(
+ CallId.getAndIncrement(), false);
while (isRunning()) { // keep retrying for IOException
try {
- if (request == null || request.getEntriesCount() == 0) {
- request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
+ if (request == null || request.get().getEntriesCount() == 0) {
+ if (request != null) {
+ request.release();
+ }
+ request = nextAppendEntriesRequest(CallId.getAndIncrement(), false);
}
if (request == null) {
@@ -73,14 +78,8 @@ class LogAppenderDefault extends LogAppenderBase {
return null;
}
- resetHeartbeatTrigger();
- final Timestamp sendTime = Timestamp.currentTime();
- getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
- final AppendEntriesReplyProto r =
getServerRpc().appendEntries(request);
- getFollower().updateLastRpcResponseTime();
- getFollower().updateLastRespondedAppendEntriesSendTime(sendTime);
-
- getLeaderState().onFollowerCommitIndex(getFollower(),
r.getFollowerCommit());
+ AppendEntriesReplyProto r = sendAppendEntries(request.get());
+ request.release();
return r;
} catch (InterruptedIOException | RaftLogIOException e) {
throw e;
@@ -98,6 +97,18 @@ class LogAppenderDefault extends LogAppenderBase {
return null;
}
+ private AppendEntriesReplyProto sendAppendEntries(AppendEntriesRequestProto
request) throws IOException {
+ resetHeartbeatTrigger();
+ final Timestamp sendTime = Timestamp.currentTime();
+ getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
+ final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
+ getFollower().updateLastRpcResponseTime();
+ getFollower().updateLastRespondedAppendEntriesSendTime(sendTime);
+
+ getLeaderState().onFollowerCommitIndex(getFollower(),
r.getFollowerCommit());
+ return r;
+ }
+
private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot)
throws InterruptedIOException {
String requestId = UUID.randomUUID().toString();
InstallSnapshotReplyProto reply = null;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 0a9a1c93c..284776d10 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -428,11 +428,16 @@ public abstract class RaftLogBase implements RaftLog {
private ByteString checkStateMachineData(ByteString data) {
if (data == null) {
- throw new IllegalStateException("State machine data is null for log
entry " + logEntry);
+ throw new IllegalStateException("State machine data is null for log
entry " + this);
}
return data;
}
+ @Override
+ public long getIndex() {
+ return logEntry.getIndex();
+ }
+
@Override
public int getSerializedSize() {
return LogProtoUtils.getSerializedSize(logEntry);
@@ -440,11 +445,11 @@ public abstract class RaftLogBase implements RaftLog {
@Override
public LogEntryProto getEntry(TimeDuration timeout) throws
RaftLogIOException, TimeoutException {
- LogEntryProto entryProto;
if (future == null) {
return logEntry;
}
+ final LogEntryProto entryProto;
try {
entryProto = future.thenApply(data ->
LogProtoUtils.addStateMachineData(data, logEntry))
.get(timeout.getDuration(), timeout.getUnit());
@@ -457,14 +462,14 @@ public abstract class RaftLogBase implements RaftLog {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- final String err = getName() + ": Failed readStateMachineData for " +
toLogEntryString(logEntry);
+ final String err = getName() + ": Failed readStateMachineData for " +
this;
LOG.error(err, e);
throw new RaftLogIOException(err,
JavaUtils.unwrapCompletionException(e));
}
// by this time we have already read the state machine data,
// so the log entry data should be set now
if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
- final String err = getName() + ": State machine data not set for " +
toLogEntryString(logEntry);
+ final String err = getName() + ": State machine data not set for " +
this;
LOG.error(err);
throw new RaftLogIOException(err);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index feedaeee4..55036fac5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -135,14 +135,14 @@ public class MemoryRaftLog extends RaftLogBase {
}
@Override
- public EntryWithData getEntryWithData(long index) {
- // TODO. The reference counted object should be passed to LogAppender
RATIS-2026.
- ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
- try {
- return newEntryWithData(ref.get(), null);
- } finally {
- ref.release();
- }
+ public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
+ throw new UnsupportedOperationException("Use retainEntryWithData(" + index
+ ") instead.");
+ }
+
+ @Override
+ public ReferenceCountedObject<EntryWithData> retainEntryWithData(long index)
{
+ final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
+ return ref.delegate(newEntryWithData(ref.get(), null));
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index bb0793abe..b7dd32689 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -319,21 +319,19 @@ public final class SegmentedRaftLog extends RaftLogBase {
@Override
public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
+ throw new UnsupportedOperationException("Use retainEntryWithData(" + index
+ ") instead.");
+ }
+
+ @Override
+ public ReferenceCountedObject<EntryWithData> retainEntryWithData(long index)
throws RaftLogIOException {
final ReferenceCountedObject<LogEntryProto> entryRef = retainLog(index);
if (entryRef == null) {
throw new RaftLogIOException("Log entry not found: index = " + index);
}
- try {
- // TODO. The reference counted object should be passed to LogAppender
RATIS-2026.
- return getEntryWithData(entryRef.get());
- } finally {
- entryRef.release();
- }
- }
- private EntryWithData getEntryWithData(LogEntryProto entry) throws
RaftLogIOException {
+ final LogEntryProto entry = entryRef.get();
if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
- return newEntryWithData(entry, null);
+ return entryRef.delegate(newEntryWithData(entry, null));
}
try {
@@ -344,7 +342,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
throw new CompletionException("Failed to read state machine data for
log entry " + entry, ex);
});
}
- return newEntryWithData(entry, future);
+ return entryRef.delegate(newEntryWithData(entry, future));
} catch (Exception e) {
final String err = getName() + ": Failed readStateMachineData for " +
LogProtoUtils.toLogEntryString(entry);