szetszwo commented on code in PR #1032:
URL: https://github.com/apache/ratis/pull/1032#discussion_r1469990865
##########
ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java:
##########
@@ -462,6 +463,9 @@ public synchronized void receivedRequest(REQUEST request,
Consumer<REQUEST> proc
} else {
final boolean isRetry = requests.putIfAbsent(request);
LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
+ if (isRetry || request.getSeqNum() < nextToProcess) {
Review Comment:
If `request.getSeqNum() < nextToProcess`, the request should not be added
then.
```java
public synchronized void receivedRequest(REQUEST request,
Consumer<REQUEST> processingMethod) {
final long seqNum = request.getSeqNum();
final boolean accepted;
if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
nextToProcess = seqNum;
requests.putNewRequest(request);
LOG.debug("Received seq={} (first request), {}", seqNum, this);
accepted = true;
} else if (request.getSeqNum() < nextToProcess) {
LOG.debug("Received seq={} < nextToProcess {}, {}", seqNum,
nextToProcess, this);
accepted = false;
} else {
final boolean isRetry = requests.putIfAbsent(request);
LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
accepted = !isRetry;
}
if (accepted) {
processRequestsFromHead(processingMethod);
} else {
request.release();
}
}
```
##########
ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java:
##########
@@ -117,7 +117,13 @@ public V retain() {
@Override
public boolean release() {
- return
fromRefs.stream().map(ReferenceCountedObject::release).allMatch(r -> r);
+ boolean allReleased = true;
+ for (ReferenceCountedObject ref : fromRefs) {
Review Comment:
Add type parameter:
```java
for (ReferenceCountedObject<T> ref : fromRefs) {
```
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java:
##########
@@ -52,11 +52,13 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
static class PendingServerRequest<REQUEST> {
private final REQUEST request;
+ private final ReferenceCountedObject<REQUEST> requestRef;
+ private AtomicBoolean released = new AtomicBoolean(false);
Review Comment:
Let's combine these three fields:
```java
static class PendingServerRequest<REQUEST> {
private final AtomicReference<ReferenceCountedObject<REQUEST>>
requestRef;
private final CompletableFuture<Void> future = new CompletableFuture<>();
PendingServerRequest(ReferenceCountedObject<REQUEST> requestRef) {
requestRef.retain();
this.requestRef = new AtomicReference<>(requestRef);
}
REQUEST getRequest() {
return Optional.ofNullable(requestRef.get())
.map(ReferenceCountedObject::get)
.orElse(null);
}
CompletableFuture<Void> getFuture() {
return future;
}
void release() {
Optional.ofNullable(requestRef.getAndSet(null))
.ifPresent(ReferenceCountedObject::release);
}
}
```
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java:
##########
@@ -320,11 +320,13 @@ void
processClientRequest(ReferenceCountedObject<RaftClientRequest> requestRef)
final CompletableFuture<Void> f = processClientRequest(requestRef, reply
-> {
if (!reply.isSuccess()) {
- LOG.info("Failed {}, reply={}", request, reply);
+ LOG.info("Failed {}, reply={}", request.toStringShort(), reply);
Review Comment:
We cannot use `request` here
```java
final SlidingWindowEntry slidingWindowEntry =
request.getSlidingWindowEntry();
final CompletableFuture<Void> f = processClientRequest(requestRef,
reply -> {
if (!reply.isSuccess()) {
LOG.info("Failed request cid={}, {}, reply={}", callId,
slidingWindowEntry, reply);
}
final RaftClientReplyProto proto =
ClientProtoUtils.toRaftClientReplyProto(reply);
responseNext(proto);
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]