This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch snapshot-3 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit cc038aea981d53c6ece1673b137ee16d5fe63f7f Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Mon Jan 22 14:30:30 2024 -0800 RATIS-2013. OrderedAsync retry results an IllegalStateException in GrpcClientProtocolService. (#1026) --- .../org/apache/ratis/client/impl/RaftClientImpl.java | 6 ++++-- .../main/java/org/apache/ratis/util/SlidingWindow.java | 18 +++++++++++++++--- .../ratis/retry/TestExceptionDependentRetry.java | 9 ++++++--- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index ec16763c2..f42391947 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -87,8 +87,10 @@ public final class RaftClientImpl implements RaftClient { public abstract RaftClientRequest newRequestImpl(); final RaftClientRequest newRequest() { - attemptCount.incrementAndGet(); - return newRequestImpl(); + final int attempt = attemptCount.incrementAndGet(); + final RaftClientRequest request = newRequestImpl(); + LOG.debug("attempt #{}, newRequest {}", attempt, request); + return request; } CompletableFuture<RaftClientReply> getReplyFuture() { diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java index 316604db0..732e3d890 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java @@ -122,6 +122,13 @@ public interface SlidingWindow { return requests.values().iterator(); } + /** @return true iff the request already exists. */ + boolean putIfAbsent(REQUEST request) { + final long seqNum = request.getSeqNum(); + final REQUEST previous = requests.putIfAbsent(seqNum, request); + return previous != null; + } + void putNewRequest(REQUEST request) { final long seqNum = request.getSeqNum(); CollectionUtils.putNew(seqNum, request, requests, () -> getName() + ":requests"); @@ -443,11 +450,16 @@ public interface SlidingWindow { final long seqNum = request.getSeqNum(); if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) { nextToProcess = seqNum; - LOG.debug("{}: got seq={} (first request), set nextToProcess in {}", requests.getName(), seqNum, this); + requests.putNewRequest(request); + LOG.debug("Received seq={} (first request), {}", seqNum, this); } else { - LOG.debug("{}: got seq={} in {}", requests.getName(), seqNum, this); + final boolean isRetry = requests.putIfAbsent(request); + LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this); + if (isRetry) { + return; + } } - requests.putNewRequest(request); + processRequestsFromHead(processingMethod); } diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java index 7eba6a82b..264db8946 100644 --- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java +++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java @@ -178,12 +178,14 @@ public class TestExceptionDependentRetry extends BaseTest implements MiniRaftClu } void runTestExceptionRetryAttempts(MiniRaftClusterWithGrpc cluster) throws Exception { - final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + final int retryCount = 5; + final RetryPolicy timeoutPolicy = MultipleLinearRandomRetry.parseCommaSeparated("1ms, " + retryCount); final ExceptionDependentRetry policy = ExceptionDependentRetry.newBuilder() - .setExceptionToPolicy(TimeoutIOException.class, MultipleLinearRandomRetry.parseCommaSeparated("1ms, 5")) + .setExceptionToPolicy(TimeoutIOException.class, timeoutPolicy) .setDefaultPolicy(RetryPolicies.retryForeverNoSleep()) .build(); + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); // create a client with the exception dependent policy try (final RaftClient client = cluster.createClient(policy)) { client.async().send(new RaftTestUtil.SimpleMessage("1")).get(); @@ -196,7 +198,8 @@ public class TestExceptionDependentRetry extends BaseTest implements MiniRaftClu Assert.fail("Test should have failed."); } catch (ExecutionException e) { RaftRetryFailureException rrfe = (RaftRetryFailureException) e.getCause(); - Assert.assertEquals(16, rrfe.getAttemptCount()); + final int expectedCount = 1 + retryCount; // new request attempt + retry attempts + Assert.assertEquals(expectedCount, rrfe.getAttemptCount()); } finally { SimpleStateMachine4Testing.get(leader).unblockWriteStateMachineData(); cluster.shutdown();
