This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 035cd968f RATIS-2016. Correct NotificationInstallSnapshot's index
(#1030)
035cd968f is described below
commit 035cd968f934f4a8f9d9ccec082f86b0e6437aab
Author: Symious <[email protected]>
AuthorDate: Mon Jan 29 00:07:22 2024 +0800
RATIS-2016. Correct NotificationInstallSnapshot's index (#1030)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 32 +++++++++++++++++++---
1 file changed, 28 insertions(+), 4 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 9a2e84f21..ec44d8c48 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -41,7 +41,9 @@ import
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import
org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto.InstallSnapshotReplyBodyCase;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import
org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto.InstallSnapshotRequestBodyCase;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
@@ -67,6 +69,8 @@ public class GrpcLogAppender extends LogAppenderBase {
APPEND_LOG_RESPONSE_HANDLER_ON_ERROR
}
+ public static final int INSTALL_SNAPSHOT_NOTIFICATION_INDEX = 0;
+
private static final Comparator<Long> CALL_ID_COMPARATOR = (left, right) -> {
// calculate diff in order to take care the possibility of numerical
overflow
final long diff = left - right;
@@ -577,15 +581,35 @@ public class GrpcLogAppender extends LogAppenderBase {
void addPending(InstallSnapshotRequestProto request) {
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
- pending.offer(request.getSnapshotChunk().getRequestIndex());
+ final int index;
+ if (isNotificationOnly) {
+ Preconditions.assertSame(InstallSnapshotRequestBodyCase.NOTIFICATION,
+ request.getInstallSnapshotRequestBodyCase(), "request case");
+ index = INSTALL_SNAPSHOT_NOTIFICATION_INDEX;
+ } else {
+
Preconditions.assertSame(InstallSnapshotRequestBodyCase.SNAPSHOTCHUNK,
+ request.getInstallSnapshotRequestBodyCase(), "request case");
+ index = request.getSnapshotChunk().getRequestIndex();
+ }
+ if (index == 0) {
+ Preconditions.assertTrue(pending.isEmpty(), "pending queue is
non-empty before offer for index 0");
+ }
+ pending.offer(index);
}
}
void removePending(InstallSnapshotReplyProto reply) {
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
- final Integer index = pending.poll();
- Objects.requireNonNull(index, "index == null");
- Preconditions.assertTrue(index == reply.getRequestIndex());
+ final int index = Objects.requireNonNull(pending.poll(), "index ==
null");
+ if (isNotificationOnly) {
+ Preconditions.assertSame(InstallSnapshotReplyBodyCase.SNAPSHOTINDEX,
+ reply.getInstallSnapshotReplyBodyCase(), "reply case");
+ Preconditions.assertSame(INSTALL_SNAPSHOT_NOTIFICATION_INDEX, (int)
index, "poll index");
+ } else {
+ Preconditions.assertSame(InstallSnapshotReplyBodyCase.REQUESTINDEX,
+ reply.getInstallSnapshotReplyBodyCase(), "reply case");
+ Preconditions.assertSame(reply.getRequestIndex(), (int) index, "poll
index");
+ }
}
}