szetszwo commented on code in PR #1049:
URL: https://github.com/apache/ratis/pull/1049#discussion_r1527120575
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java:
##########
@@ -379,30 +379,34 @@ public Comparator<Long> getCallIdComparator() {
}
private void appendLog(boolean heartbeat) throws IOException {
- final AppendEntriesRequestProto pending;
+ final ReferenceCountedObject<AppendEntriesRequestProto> pending;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
// Prepare and send the append request.
// Note changes on follower's nextIndex and ops on pendingRequests
should always be done under the write-lock
- pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
+ pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
if (pending == null) {
return;
}
- request = new AppendEntriesRequest(pending, getFollowerId(),
grpcServerMetrics);
+ request = new AppendEntriesRequest(pending.get(), getFollowerId(),
grpcServerMetrics);
pendingRequests.put(request);
- increaseNextIndex(pending);
+ increaseNextIndex(pending.get());
if (appendLogRequestObserver == null) {
appendLogRequestObserver = new StreamObservers(
getClient(), new AppendLogResponseHandler(), useSeparateHBChannel,
getWaitTimeMin());
}
}
Review Comment:
Catch exception and release pending:
```java
@@ -379,7 +379,7 @@ public class GrpcLogAppender extends LogAppenderBase {
}
private void appendLog(boolean heartbeat) throws IOException {
- final ReferenceCountedObject<AppendEntriesRequestProto> pending;
+ ReferenceCountedObject<AppendEntriesRequestProto> pending = null;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
// Prepare and send the append request.
@@ -395,6 +395,11 @@ public class GrpcLogAppender extends LogAppenderBase {
appendLogRequestObserver = new StreamObservers(
getClient(), new AppendLogResponseHandler(),
useSeparateHBChannel, getWaitTimeMin());
}
+ } catch(Exception e) {
+ if (pending != null) {
+ pending.release();
+ }
+ throw e;
}
```
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java:
##########
@@ -457,23 +459,28 @@ public LogEntryProto getEntry(TimeDuration timeout)
throws RaftLogIOException, T
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- final String err = getName() + ": Failed readStateMachineData for " +
toLogEntryString(logEntry);
+ final String err = getName() + ": Failed readStateMachineData for " +
toLogEntryString(logEntry.get());
LOG.error(err, e);
throw new RaftLogIOException(err,
JavaUtils.unwrapCompletionException(e));
}
// by this time we have already read the state machine data,
// so the log entry data should be set now
if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
- final String err = getName() + ": State machine data not set for " +
toLogEntryString(logEntry);
+ final String err = getName() + ": State machine data not set for " +
toLogEntryString(logEntry.get());
Review Comment:
Use this instead.
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java:
##########
@@ -457,23 +459,28 @@ public LogEntryProto getEntry(TimeDuration timeout)
throws RaftLogIOException, T
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- final String err = getName() + ": Failed readStateMachineData for " +
toLogEntryString(logEntry);
+ final String err = getName() + ": Failed readStateMachineData for " +
toLogEntryString(logEntry.get());
Review Comment:
Use `this` instead, i.e.
```java
final String err = getName() + ": Failed readStateMachineData for " + this;
```
BTW, we should also change `checkStateMachineData`:
```java
private ByteString checkStateMachineData(ByteString data) {
if (data == null) {
- throw new IllegalStateException("State machine data is null for log
entry " + logEntry);
+ throw new IllegalStateException("State machine data is null for log
entry " + this);
}
return data;
}
```
##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -218,16 +221,42 @@ protected LongUnaryOperator getNextIndexForError(long
newNextIndex) {
};
}
-
@Override
- public AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat)
+ public AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat) throws RaftLogIOException {
+ ReferenceCountedObject<AppendEntriesRequestProto> ref =
nextAppendEntriesRequest(callId, heartbeat);
+ try {
+ return AppendEntriesRequestProto.parseFrom(ref.get().toByteString());
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException("Error copying AppendEntriesRequest.",
e);
+ } finally {
+ ref.release();
+ }
+ }
Review Comment:
Since this is an implementation, we could simply throw an exception.
```java
@Override
public AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat) {
throw new UnsupportedOperationException("Use nextAppendEntriesRequest("
+ callId + ", " + heartbeat +") instead.");
}
```
##########
ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java:
##########
@@ -79,6 +80,10 @@ default ReferenceCountedObject<LogEntryProto> retainLog(long
index) throws RaftL
/**
* @return null if the log entry is not found in this log;
* otherwise, return the {@link EntryWithData} corresponding to the
given index.
+ * The {@link EntryWithData} enclosed retained underlying resource.
The client code need to ensure either
+ * calling {@link ReferenceCountedObject#release()} on the result of
+ * {@link EntryWithData#getEntryRef(TimeDuration)} or {@link
EntryWithData#release()} if the entry
+ * is discarded.
Review Comment:
This is still an incompatible change since the existing code, without
update, won't call `release()`. Let add a `retainEntryWithData(..)` method
instead.
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java:
##########
@@ -323,17 +323,13 @@ public EntryWithData getEntryWithData(long index) throws
RaftLogIOException {
if (entryRef == null) {
throw new RaftLogIOException("Log entry not found: index = " + index);
}
- try {
- // TODO. The reference counted object should be passed to LogAppender
RATIS-2026.
- return getEntryWithData(entryRef.get());
- } finally {
- entryRef.release();
- }
+ return getEntryWithData(entryRef);
}
- private EntryWithData getEntryWithData(LogEntryProto entry) throws
RaftLogIOException {
+ private EntryWithData getEntryWithData(ReferenceCountedObject<LogEntryProto>
entryRef) throws RaftLogIOException {
Review Comment:
We should updo the change in RATIS-2028 and merge the methods:
```java
@@ -323,11 +323,8 @@ public final class SegmentedRaftLog extends RaftLogBase
{
if (entryRef == null) {
throw new RaftLogIOException("Log entry not found: index = " + index);
}
- return getEntryWithData(entryRef);
- }
- private EntryWithData
getEntryWithData(ReferenceCountedObject<LogEntryProto> entryRef) throws
RaftLogIOException {
- LogEntryProto entry = entryRef.get();
+ final LogEntryProto entry = entryRef.get();
if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
return newEntryWithData(entryRef, null);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]