szetszwo commented on code in PR #1049:
URL: https://github.com/apache/ratis/pull/1049#discussion_r1527120575


##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java:
##########
@@ -379,30 +379,34 @@ public Comparator<Long> getCallIdComparator() {
   }
 
   private void appendLog(boolean heartbeat) throws IOException {
-    final AppendEntriesRequestProto pending;
+    final ReferenceCountedObject<AppendEntriesRequestProto> pending;
     final AppendEntriesRequest request;
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
       // Prepare and send the append request.
       // Note changes on follower's nextIndex and ops on pendingRequests 
should always be done under the write-lock
-      pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
+      pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
       if (pending == null) {
         return;
       }
-      request = new AppendEntriesRequest(pending, getFollowerId(), 
grpcServerMetrics);
+      request = new AppendEntriesRequest(pending.get(), getFollowerId(), 
grpcServerMetrics);
       pendingRequests.put(request);
-      increaseNextIndex(pending);
+      increaseNextIndex(pending.get());
       if (appendLogRequestObserver == null) {
         appendLogRequestObserver = new StreamObservers(
             getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, 
getWaitTimeMin());
       }
     }

Review Comment:
   Catch exception and release pending:
   ```java
   @@ -379,7 +379,7 @@ public class GrpcLogAppender extends LogAppenderBase {
      }
    
      private void appendLog(boolean heartbeat) throws IOException {
   -    final ReferenceCountedObject<AppendEntriesRequestProto> pending;
   +    ReferenceCountedObject<AppendEntriesRequestProto> pending = null;
        final AppendEntriesRequest request;
        try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
          // Prepare and send the append request.
   @@ -395,6 +395,11 @@ public class GrpcLogAppender extends LogAppenderBase {
            appendLogRequestObserver = new StreamObservers(
                getClient(), new AppendLogResponseHandler(), 
useSeparateHBChannel, getWaitTimeMin());
          }
   +    } catch(Exception e) {
   +      if (pending != null) {
   +        pending.release();
   +      }
   +      throw e;
        }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java:
##########
@@ -457,23 +459,28 @@ public LogEntryProto getEntry(TimeDuration timeout) 
throws RaftLogIOException, T
         if (e instanceof InterruptedException) {
           Thread.currentThread().interrupt();
         }
-        final String err = getName() + ": Failed readStateMachineData for " + 
toLogEntryString(logEntry);
+        final String err = getName() + ": Failed readStateMachineData for " + 
toLogEntryString(logEntry.get());
         LOG.error(err, e);
         throw new RaftLogIOException(err, 
JavaUtils.unwrapCompletionException(e));
       }
       // by this time we have already read the state machine data,
       // so the log entry data should be set now
       if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
-        final String err = getName() + ": State machine data not set for " + 
toLogEntryString(logEntry);
+        final String err = getName() + ": State machine data not set for " + 
toLogEntryString(logEntry.get());

Review Comment:
   Use this instead.



##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java:
##########
@@ -457,23 +459,28 @@ public LogEntryProto getEntry(TimeDuration timeout) 
throws RaftLogIOException, T
         if (e instanceof InterruptedException) {
           Thread.currentThread().interrupt();
         }
-        final String err = getName() + ": Failed readStateMachineData for " + 
toLogEntryString(logEntry);
+        final String err = getName() + ": Failed readStateMachineData for " + 
toLogEntryString(logEntry.get());

Review Comment:
   Use `this` instead, i.e.
   ```java
   final String err = getName() + ": Failed readStateMachineData for " + this;
   ```
   
   BTW, we should also change `checkStateMachineData`:
   ```java
        private ByteString checkStateMachineData(ByteString data) {
          if (data == null) {
   -        throw new IllegalStateException("State machine data is null for log 
entry " + logEntry);
   +        throw new IllegalStateException("State machine data is null for log 
entry " + this);
          }
          return data;
        }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -218,16 +221,42 @@ protected LongUnaryOperator getNextIndexForError(long 
newNextIndex) {
     };
   }
 
-
   @Override
-  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat)
+  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat) throws RaftLogIOException {
+    ReferenceCountedObject<AppendEntriesRequestProto> ref = 
nextAppendEntriesRequest(callId, heartbeat);
+    try {
+      return AppendEntriesRequestProto.parseFrom(ref.get().toByteString());
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalStateException("Error copying AppendEntriesRequest.", 
e);
+    } finally {
+      ref.release();
+    }
+  }

Review Comment:
   Since this is an implementation, we could simply throw an exception.
   ```java
     @Override
     public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat) {
       throw new UnsupportedOperationException("Use nextAppendEntriesRequest(" 
+ callId + ", " + heartbeat +") instead.");
     }
   ```



##########
ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java:
##########
@@ -79,6 +80,10 @@ default ReferenceCountedObject<LogEntryProto> retainLog(long 
index) throws RaftL
   /**
    * @return null if the log entry is not found in this log;
    *         otherwise, return the {@link EntryWithData} corresponding to the 
given index.
+   *         The {@link EntryWithData} enclosed retained underlying resource. 
The client code need to ensure either
+   *         calling {@link ReferenceCountedObject#release()} on the result of
+   *         {@link EntryWithData#getEntryRef(TimeDuration)} or {@link 
EntryWithData#release()} if the entry
+   *         is discarded.

Review Comment:
   This is still an incompatible change since the existing code, without 
update, won't call `release()`.  Let add a `retainEntryWithData(..)` method 
instead.



##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java:
##########
@@ -323,17 +323,13 @@ public EntryWithData getEntryWithData(long index) throws 
RaftLogIOException {
     if (entryRef == null) {
       throw new RaftLogIOException("Log entry not found: index = " + index);
     }
-    try {
-      // TODO. The reference counted object should be passed to LogAppender 
RATIS-2026.
-      return getEntryWithData(entryRef.get());
-    } finally {
-      entryRef.release();
-    }
+    return getEntryWithData(entryRef);
   }
 
-  private EntryWithData getEntryWithData(LogEntryProto entry) throws 
RaftLogIOException {
+  private EntryWithData getEntryWithData(ReferenceCountedObject<LogEntryProto> 
entryRef) throws RaftLogIOException {

Review Comment:
   We should updo the change in RATIS-2028 and merge the methods:
   ```java
   @@ -323,11 +323,8 @@ public final class SegmentedRaftLog extends RaftLogBase 
{
        if (entryRef == null) {
          throw new RaftLogIOException("Log entry not found: index = " + index);
        }
   -    return getEntryWithData(entryRef);
   -  }
    
   -  private EntryWithData 
getEntryWithData(ReferenceCountedObject<LogEntryProto> entryRef) throws 
RaftLogIOException {
   -    LogEntryProto entry = entryRef.get();
   +    final LogEntryProto entry = entryRef.get();
        if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
          return newEntryWithData(entryRef, null);
        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to