This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c1e0ec284 RATIS-2013. OrderedAsync retry results an
IllegalStateException in GrpcClientProtocolService. (#1026)
c1e0ec284 is described below
commit c1e0ec284aa88078db63a8b3cc1a90231144eff3
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 22 14:30:30 2024 -0800
RATIS-2013. OrderedAsync retry results an IllegalStateException in
GrpcClientProtocolService. (#1026)
---
.../org/apache/ratis/client/impl/RaftClientImpl.java | 6 ++++--
.../main/java/org/apache/ratis/util/SlidingWindow.java | 18 +++++++++++++++---
.../ratis/retry/TestExceptionDependentRetry.java | 9 ++++++---
3 files changed, 25 insertions(+), 8 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ec16763c2..f42391947 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -87,8 +87,10 @@ public final class RaftClientImpl implements RaftClient {
public abstract RaftClientRequest newRequestImpl();
final RaftClientRequest newRequest() {
- attemptCount.incrementAndGet();
- return newRequestImpl();
+ final int attempt = attemptCount.incrementAndGet();
+ final RaftClientRequest request = newRequestImpl();
+ LOG.debug("attempt #{}, newRequest {}", attempt, request);
+ return request;
}
CompletableFuture<RaftClientReply> getReplyFuture() {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 316604db0..732e3d890 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -122,6 +122,13 @@ public interface SlidingWindow {
return requests.values().iterator();
}
+ /** @return true iff the request already exists. */
+ boolean putIfAbsent(REQUEST request) {
+ final long seqNum = request.getSeqNum();
+ final REQUEST previous = requests.putIfAbsent(seqNum, request);
+ return previous != null;
+ }
+
void putNewRequest(REQUEST request) {
final long seqNum = request.getSeqNum();
CollectionUtils.putNew(seqNum, request, requests, () -> getName() +
":requests");
@@ -443,11 +450,16 @@ public interface SlidingWindow {
final long seqNum = request.getSeqNum();
if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
nextToProcess = seqNum;
- LOG.debug("{}: got seq={} (first request), set nextToProcess in {}",
requests.getName(), seqNum, this);
+ requests.putNewRequest(request);
+ LOG.debug("Received seq={} (first request), {}", seqNum, this);
} else {
- LOG.debug("{}: got seq={} in {}", requests.getName(), seqNum, this);
+ final boolean isRetry = requests.putIfAbsent(request);
+ LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
+ if (isRetry) {
+ return;
+ }
}
- requests.putNewRequest(request);
+
processRequestsFromHead(processingMethod);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 7eba6a82b..264db8946 100644
---
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -178,12 +178,14 @@ public class TestExceptionDependentRetry extends BaseTest
implements MiniRaftClu
}
void runTestExceptionRetryAttempts(MiniRaftClusterWithGrpc cluster) throws
Exception {
- final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final int retryCount = 5;
+ final RetryPolicy timeoutPolicy =
MultipleLinearRandomRetry.parseCommaSeparated("1ms, " + retryCount);
final ExceptionDependentRetry policy = ExceptionDependentRetry.newBuilder()
- .setExceptionToPolicy(TimeoutIOException.class,
MultipleLinearRandomRetry.parseCommaSeparated("1ms, 5"))
+ .setExceptionToPolicy(TimeoutIOException.class, timeoutPolicy)
.setDefaultPolicy(RetryPolicies.retryForeverNoSleep())
.build();
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
// create a client with the exception dependent policy
try (final RaftClient client = cluster.createClient(policy)) {
client.async().send(new RaftTestUtil.SimpleMessage("1")).get();
@@ -196,7 +198,8 @@ public class TestExceptionDependentRetry extends BaseTest
implements MiniRaftClu
Assert.fail("Test should have failed.");
} catch (ExecutionException e) {
RaftRetryFailureException rrfe = (RaftRetryFailureException)
e.getCause();
- Assert.assertEquals(16, rrfe.getAttemptCount());
+ final int expectedCount = 1 + retryCount; // new request attempt + retry
attempts
+ Assert.assertEquals(expectedCount, rrfe.getAttemptCount());
} finally {
SimpleStateMachine4Testing.get(leader).unblockWriteStateMachineData();
cluster.shutdown();