This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b1ce78991d IGNITE-23071 Yet another raft log pollution reduction.
(#4283)
b1ce78991d is described below
commit b1ce78991d524ba05044f12a553681867e947aef
Author: Alexander Lapin <[email protected]>
AuthorDate: Tue Aug 27 13:03:13 2024 +0300
IGNITE-23071 Yet another raft log pollution reduction. (#4283)
---
.../ignite/internal/raft/RaftGroupServiceImpl.java | 65 +++++++++++++++-------
1 file changed, 44 insertions(+), 21 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 19c21a8379..fc7526bc83 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -21,6 +21,7 @@ import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.ThreadLocalRandom.current;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
@@ -553,7 +554,7 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
) {
var future = new CompletableFuture<R>();
- sendWithRetry(peer, requestFactory, currentTimeMillis() +
configuration.retryTimeout().value(), future);
+ sendWithRetry(peer, requestFactory, currentTimeMillis() +
configuration.retryTimeout().value(), future, 1);
return future;
}
@@ -566,9 +567,16 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
* @param stopTime Stop time.
* @param fut The future.
* @param <R> Response type.
+ * @param retryCount Number of retries made. sendWithRetry method has a
recursion nature, in case of recoverable exceptions or peer
+ * unavailability it'll be scheduled for a next attempt. Generally a
request will be retried until success or timeout.
*/
private <R extends NetworkMessage> void sendWithRetry(
- Peer peer, Function<Peer, ? extends NetworkMessage>
requestFactory, long stopTime, CompletableFuture<R> fut
+ Peer peer,
+ Function<Peer, ? extends NetworkMessage> requestFactory,
+ long stopTime,
+ CompletableFuture<R> fut,
+ int retryCount
+
) {
if (!busyLock.enterBusy()) {
fut.cancel(true);
@@ -578,7 +586,8 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
try {
if (currentTimeMillis() >= stopTime) {
- fut.completeExceptionally(new TimeoutException());
+ fut.completeExceptionally(
+ new TimeoutException(format("Send with retry timed out
[retryCount = {}].", retryCount)));
return;
}
@@ -597,9 +606,9 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
}
if (err != null) {
- handleThrowable(err, peer, request,
requestFactory, stopTime, fut);
+ handleThrowable(err, peer, request,
requestFactory, stopTime, fut, retryCount);
} else if (resp instanceof ErrorResponse) {
- handleErrorResponse((ErrorResponse) resp, peer,
request, requestFactory, stopTime, fut);
+ handleErrorResponse((ErrorResponse) resp, peer,
request, requestFactory, stopTime, fut, retryCount);
} else if (resp instanceof SMErrorResponse) {
handleSmErrorResponse((SMErrorResponse) resp, fut);
} else {
@@ -619,23 +628,36 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
NetworkMessage sentRequest,
Function<Peer, ? extends NetworkMessage> requestFactory,
long stopTime,
- CompletableFuture<? extends NetworkMessage> fut
+ CompletableFuture<? extends NetworkMessage> fut,
+ int retryCount
) {
err = unwrapCause(err);
if (recoverable(err)) {
Peer randomPeer = randomNode(peer);
- LOG.warn(
- "Recoverable error during the request occurred (will be
retried on the randomly selected node) "
- + "[request={}, peer={}, newPeer={}].",
- err,
- LOG.isDebugEnabled() && includeSensitive() ? sentRequest :
sentRequest.toStringForLightLogging(),
- peer,
- randomPeer
- );
+ if (LOG.isDebugEnabled()) {
+ if (err instanceof TimeoutException) {
+ LOG.debug(
+ "Recoverable TimeoutException during the request
occurred (will be retried on the randomly selected"
+ + " node) [request={}, peer={},
newPeer={}].",
+ includeSensitive() ? sentRequest :
sentRequest.toStringForLightLogging(),
+ peer,
+ randomPeer
+ );
+ } else {
+ LOG.debug(
+ "Recoverable error during the request occurred
(will be retried on the randomly selected node) "
+ + "[request={}, peer={}, newPeer={}].",
+ err,
+ includeSensitive() ? sentRequest :
sentRequest.toStringForLightLogging(),
+ peer,
+ randomPeer
+ );
+ }
+ }
- scheduleRetry(() -> sendWithRetry(randomPeer, requestFactory,
stopTime, fut));
+ scheduleRetry(() -> sendWithRetry(randomPeer, requestFactory,
stopTime, fut, retryCount + 1));
} else {
fut.completeExceptionally(err);
}
@@ -647,7 +669,8 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
NetworkMessage sentRequest,
Function<Peer, ? extends NetworkMessage> requestFactory,
long stopTime,
- CompletableFuture<? extends NetworkMessage> fut
+ CompletableFuture<? extends NetworkMessage> fut,
+ int retryCount
) {
RaftError error = RaftError.forNumber(resp.errorCode());
@@ -661,7 +684,7 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
case EBUSY:
case EAGAIN:
- scheduleRetry(() -> sendWithRetry(peer, requestFactory,
stopTime, fut));
+ scheduleRetry(() -> sendWithRetry(peer, requestFactory,
stopTime, fut, retryCount + 1));
break;
@@ -670,9 +693,9 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
// If changing peers or requesting a leader and something
is not found
// probably target peer is doing rebalancing, try another
peer.
if (sentRequest instanceof GetLeaderRequest || sentRequest
instanceof ChangePeersAndLearnersAsyncRequest) {
- sendWithRetry(randomNode(peer), requestFactory,
stopTime, fut);
+ sendWithRetry(randomNode(peer), requestFactory,
stopTime, fut, retryCount + 1);
} else {
- sendWithRetry(peer, requestFactory, stopTime, fut);
+ sendWithRetry(peer, requestFactory, stopTime, fut,
retryCount + 1);
}
});
@@ -683,11 +706,11 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
case UNKNOWN:
case EINTERNAL:
if (resp.leaderId() == null) {
- scheduleRetry(() -> sendWithRetry(randomNode(peer),
requestFactory, stopTime, fut));
+ scheduleRetry(() -> sendWithRetry(randomNode(peer),
requestFactory, stopTime, fut, retryCount + 1));
} else {
leader = parsePeer(resp.leaderId()); // Update a leader.
- scheduleRetry(() -> sendWithRetry(leader, requestFactory,
stopTime, fut));
+ scheduleRetry(() -> sendWithRetry(leader, requestFactory,
stopTime, fut, retryCount + 1));
}
break;