This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c1e0ec284 RATIS-2013. OrderedAsync retry results an 
IllegalStateException in GrpcClientProtocolService. (#1026)
c1e0ec284 is described below

commit c1e0ec284aa88078db63a8b3cc1a90231144eff3
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 22 14:30:30 2024 -0800

    RATIS-2013. OrderedAsync retry results an IllegalStateException in 
GrpcClientProtocolService. (#1026)
---
 .../org/apache/ratis/client/impl/RaftClientImpl.java   |  6 ++++--
 .../main/java/org/apache/ratis/util/SlidingWindow.java | 18 +++++++++++++++---
 .../ratis/retry/TestExceptionDependentRetry.java       |  9 ++++++---
 3 files changed, 25 insertions(+), 8 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ec16763c2..f42391947 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -87,8 +87,10 @@ public final class RaftClientImpl implements RaftClient {
     public abstract RaftClientRequest newRequestImpl();
 
     final RaftClientRequest newRequest() {
-      attemptCount.incrementAndGet();
-      return newRequestImpl();
+      final int attempt = attemptCount.incrementAndGet();
+      final RaftClientRequest request = newRequestImpl();
+      LOG.debug("attempt #{}, newRequest {}", attempt, request);
+      return request;
     }
 
     CompletableFuture<RaftClientReply> getReplyFuture() {
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java 
b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 316604db0..732e3d890 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -122,6 +122,13 @@ public interface SlidingWindow {
       return requests.values().iterator();
     }
 
+    /** @return true iff the request already exists. */
+    boolean putIfAbsent(REQUEST request) {
+      final long seqNum = request.getSeqNum();
+      final REQUEST previous = requests.putIfAbsent(seqNum, request);
+      return previous != null;
+    }
+
     void putNewRequest(REQUEST request) {
       final long seqNum = request.getSeqNum();
       CollectionUtils.putNew(seqNum, request, requests, () -> getName() + 
":requests");
@@ -443,11 +450,16 @@ public interface SlidingWindow {
       final long seqNum = request.getSeqNum();
       if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
         nextToProcess = seqNum;
-        LOG.debug("{}: got seq={} (first request), set nextToProcess in {}", 
requests.getName(), seqNum, this);
+        requests.putNewRequest(request);
+        LOG.debug("Received seq={} (first request), {}", seqNum, this);
       } else {
-        LOG.debug("{}: got seq={} in {}", requests.getName(), seqNum, this);
+        final boolean isRetry = requests.putIfAbsent(request);
+        LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
+        if (isRetry) {
+          return;
+        }
       }
-      requests.putNewRequest(request);
+
       processRequestsFromHead(processingMethod);
     }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
 
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 7eba6a82b..264db8946 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -178,12 +178,14 @@ public class TestExceptionDependentRetry extends BaseTest 
implements MiniRaftClu
   }
 
   void runTestExceptionRetryAttempts(MiniRaftClusterWithGrpc cluster) throws 
Exception {
-    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    final int retryCount = 5;
+    final RetryPolicy timeoutPolicy = 
MultipleLinearRandomRetry.parseCommaSeparated("1ms, " + retryCount);
     final ExceptionDependentRetry policy = ExceptionDependentRetry.newBuilder()
-        .setExceptionToPolicy(TimeoutIOException.class, 
MultipleLinearRandomRetry.parseCommaSeparated("1ms, 5"))
+        .setExceptionToPolicy(TimeoutIOException.class, timeoutPolicy)
         .setDefaultPolicy(RetryPolicies.retryForeverNoSleep())
         .build();
 
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
     // create a client with the exception dependent policy
     try (final RaftClient client = cluster.createClient(policy)) {
       client.async().send(new RaftTestUtil.SimpleMessage("1")).get();
@@ -196,7 +198,8 @@ public class TestExceptionDependentRetry extends BaseTest 
implements MiniRaftClu
       Assert.fail("Test should have failed.");
     } catch (ExecutionException e) {
       RaftRetryFailureException rrfe = (RaftRetryFailureException) 
e.getCause();
-      Assert.assertEquals(16, rrfe.getAttemptCount());
+      final int expectedCount = 1 + retryCount; // new request attempt + retry 
attempts
+      Assert.assertEquals(expectedCount, rrfe.getAttemptCount());
     } finally {
       SimpleStateMachine4Testing.get(leader).unblockWriteStateMachineData();
       cluster.shutdown();

Reply via email to