This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b1ce78991d IGNITE-23071 Yet another raft log pollution reduction. 
(#4283)
b1ce78991d is described below

commit b1ce78991d524ba05044f12a553681867e947aef
Author: Alexander Lapin <[email protected]>
AuthorDate: Tue Aug 27 13:03:13 2024 +0300

    IGNITE-23071 Yet another raft log pollution reduction. (#4283)
---
 .../ignite/internal/raft/RaftGroupServiceImpl.java | 65 +++++++++++++++-------
 1 file changed, 44 insertions(+), 21 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 19c21a8379..fc7526bc83 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -21,6 +21,7 @@ import static java.lang.System.currentTimeMillis;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.ThreadLocalRandom.current;
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
@@ -553,7 +554,7 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
     ) {
         var future = new CompletableFuture<R>();
 
-        sendWithRetry(peer, requestFactory, currentTimeMillis() + 
configuration.retryTimeout().value(), future);
+        sendWithRetry(peer, requestFactory, currentTimeMillis() + 
configuration.retryTimeout().value(), future, 1);
 
         return future;
     }
@@ -566,9 +567,16 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
      * @param stopTime Stop time.
      * @param fut The future.
      * @param <R> Response type.
+     * @param retryCount Number of retries made. sendWithRetry method has a 
recursion nature, in case of recoverable exceptions or peer
+     *     unavailability it'll be scheduled for a next attempt. Generally a 
request will be retried until success or timeout.
      */
     private <R extends NetworkMessage> void sendWithRetry(
-            Peer peer, Function<Peer, ? extends NetworkMessage> 
requestFactory, long stopTime, CompletableFuture<R> fut
+            Peer peer,
+            Function<Peer, ? extends NetworkMessage> requestFactory,
+            long stopTime,
+            CompletableFuture<R> fut,
+            int retryCount
+
     ) {
         if (!busyLock.enterBusy()) {
             fut.cancel(true);
@@ -578,7 +586,8 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
 
         try {
             if (currentTimeMillis() >= stopTime) {
-                fut.completeExceptionally(new TimeoutException());
+                fut.completeExceptionally(
+                        new TimeoutException(format("Send with retry timed out 
[retryCount = {}].", retryCount)));
 
                 return;
             }
@@ -597,9 +606,9 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
                         }
 
                         if (err != null) {
-                            handleThrowable(err, peer, request, 
requestFactory, stopTime, fut);
+                            handleThrowable(err, peer, request, 
requestFactory, stopTime, fut, retryCount);
                         } else if (resp instanceof ErrorResponse) {
-                            handleErrorResponse((ErrorResponse) resp, peer, 
request, requestFactory, stopTime, fut);
+                            handleErrorResponse((ErrorResponse) resp, peer, 
request, requestFactory, stopTime, fut, retryCount);
                         } else if (resp instanceof SMErrorResponse) {
                             handleSmErrorResponse((SMErrorResponse) resp, fut);
                         } else {
@@ -619,23 +628,36 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
             NetworkMessage sentRequest,
             Function<Peer, ? extends NetworkMessage> requestFactory,
             long stopTime,
-            CompletableFuture<? extends NetworkMessage> fut
+            CompletableFuture<? extends NetworkMessage> fut,
+            int retryCount
     ) {
         err = unwrapCause(err);
 
         if (recoverable(err)) {
             Peer randomPeer = randomNode(peer);
 
-            LOG.warn(
-                    "Recoverable error during the request occurred (will be 
retried on the randomly selected node) "
-                            + "[request={}, peer={}, newPeer={}].",
-                    err,
-                    LOG.isDebugEnabled() && includeSensitive() ? sentRequest : 
sentRequest.toStringForLightLogging(),
-                    peer,
-                    randomPeer
-            );
+            if (LOG.isDebugEnabled()) {
+                if (err instanceof TimeoutException) {
+                    LOG.debug(
+                            "Recoverable TimeoutException during the request 
occurred (will be retried on the randomly selected"
+                                    + " node) [request={}, peer={}, 
newPeer={}].",
+                            includeSensitive() ? sentRequest : 
sentRequest.toStringForLightLogging(),
+                            peer,
+                            randomPeer
+                    );
+                } else {
+                    LOG.debug(
+                            "Recoverable error during the request occurred 
(will be retried on the randomly selected node) "
+                                    + "[request={}, peer={}, newPeer={}].",
+                            err,
+                            includeSensitive() ? sentRequest : 
sentRequest.toStringForLightLogging(),
+                            peer,
+                            randomPeer
+                    );
+                }
+            }
 
-            scheduleRetry(() -> sendWithRetry(randomPeer, requestFactory, 
stopTime, fut));
+            scheduleRetry(() -> sendWithRetry(randomPeer, requestFactory, 
stopTime, fut, retryCount + 1));
         } else {
             fut.completeExceptionally(err);
         }
@@ -647,7 +669,8 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
             NetworkMessage sentRequest,
             Function<Peer, ? extends NetworkMessage> requestFactory,
             long stopTime,
-            CompletableFuture<? extends NetworkMessage> fut
+            CompletableFuture<? extends NetworkMessage> fut,
+            int retryCount
     ) {
         RaftError error = RaftError.forNumber(resp.errorCode());
 
@@ -661,7 +684,7 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
 
             case EBUSY:
             case EAGAIN:
-                scheduleRetry(() -> sendWithRetry(peer, requestFactory, 
stopTime, fut));
+                scheduleRetry(() -> sendWithRetry(peer, requestFactory, 
stopTime, fut, retryCount + 1));
 
                 break;
 
@@ -670,9 +693,9 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
                     // If changing peers or requesting a leader and something 
is not found
                     // probably target peer is doing rebalancing, try another 
peer.
                     if (sentRequest instanceof GetLeaderRequest || sentRequest 
instanceof ChangePeersAndLearnersAsyncRequest) {
-                        sendWithRetry(randomNode(peer), requestFactory, 
stopTime, fut);
+                        sendWithRetry(randomNode(peer), requestFactory, 
stopTime, fut, retryCount + 1);
                     } else {
-                        sendWithRetry(peer, requestFactory, stopTime, fut);
+                        sendWithRetry(peer, requestFactory, stopTime, fut, 
retryCount + 1);
                     }
                 });
 
@@ -683,11 +706,11 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
             case UNKNOWN:
             case EINTERNAL:
                 if (resp.leaderId() == null) {
-                    scheduleRetry(() -> sendWithRetry(randomNode(peer), 
requestFactory, stopTime, fut));
+                    scheduleRetry(() -> sendWithRetry(randomNode(peer), 
requestFactory, stopTime, fut, retryCount + 1));
                 } else {
                     leader = parsePeer(resp.leaderId()); // Update a leader.
 
-                    scheduleRetry(() -> sendWithRetry(leader, requestFactory, 
stopTime, fut));
+                    scheduleRetry(() -> sendWithRetry(leader, requestFactory, 
stopTime, fut, retryCount + 1));
                 }
 
                 break;

Reply via email to