This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a6db008760 IGNITE-23404 Properly handle EHOSTDOWN error in 
RaftGroupServiceImpl (#4543)
a6db008760 is described below

commit a6db008760a73536015567bca6e93647616d5ba4
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Oct 16 14:30:01 2024 +0300

    IGNITE-23404 Properly handle EHOSTDOWN error in RaftGroupServiceImpl (#4543)
---
 .../ignite/internal/raft/RaftGroupServiceImpl.java | 289 +++++++++++----------
 .../apache/ignite/internal/raft/RetryContext.java  | 142 ++++++++++
 .../ignite/internal/raft/RaftGroupServiceTest.java |  76 ++++++
 3 files changed, 372 insertions(+), 135 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 04fc475234..be6578b27c 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.raft;
 
 import static java.lang.System.currentTimeMillis;
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.ThreadLocalRandom.current;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
@@ -47,6 +46,7 @@ import static 
org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -67,7 +67,6 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.entity.PeerId;
@@ -89,7 +88,7 @@ import org.jetbrains.annotations.Nullable;
  * The implementation of {@link RaftGroupService}.
  */
 // TODO: IGNITE-20738 Methods 
updateConfiguration/refreshMembers/*Peer/*Learner are not thread-safe
-// and can produce meaningless (peers, learners) pairs as a result.
+//  and can produce meaningless (peers, learners) pairs as a result.
 public class RaftGroupServiceImpl implements RaftGroupService {
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(RaftGroupServiceImpl.class);
@@ -534,7 +533,9 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
     ) {
         var future = new CompletableFuture<R>();
 
-        sendWithRetry(peer, requestFactory, currentTimeMillis() + 
configuration.retryTimeout().value(), future, 1);
+        var context = new RetryContext(peer, requestFactory, 
currentTimeMillis() + configuration.retryTimeout().value());
+
+        sendWithRetry(future, context);
 
         return future;
     }
@@ -542,22 +543,11 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
     /**
      * Retries a request until success or timeout.
      *
-     * @param peer Initial target peer, request can be sent to a random peer 
if the target peer is unavailable.
-     * @param requestFactory Factory for creating requests to the target peer.
-     * @param stopTime Stop time.
-     * @param fut The future.
+     * @param fut Result future.
+     * @param retryContext Context.
      * @param <R> Response type.
-     * @param retryCount Number of retries made. sendWithRetry method has a 
recursion nature, in case of recoverable exceptions or peer
-     *     unavailability it'll be scheduled for a next attempt. Generally a 
request will be retried until success or timeout.
      */
-    private <R extends NetworkMessage> void sendWithRetry(
-            Peer peer,
-            Function<Peer, ? extends NetworkMessage> requestFactory,
-            long stopTime,
-            CompletableFuture<R> fut,
-            int retryCount
-
-    ) {
+    private <R extends NetworkMessage> void sendWithRetry(CompletableFuture<R> 
fut, RetryContext retryContext) {
         if (!busyLock.enterBusy()) {
             fut.cancel(true);
 
@@ -565,37 +555,44 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
         }
 
         try {
-            if (currentTimeMillis() >= stopTime) {
-                fut.completeExceptionally(
-                        new TimeoutException(format("Send with retry timed out 
[retryCount = {}, groupId = {}].", retryCount, groupId)));
+            if (currentTimeMillis() >= retryContext.stopTime()) {
+                fut.completeExceptionally(new TimeoutException(format(
+                        "Send with retry timed out [retryCount = {}, groupId = 
{}, traceId = {}].",
+                        retryContext.retryCount(),
+                        groupId,
+                        retryContext.errorTraceId()
+                )));
 
                 return;
             }
 
-            NetworkMessage request = requestFactory.apply(peer);
-
-            resolvePeer(peer)
-                    .thenCompose(node -> 
cluster.messagingService().invoke(node, request, 
configuration.responseTimeout().value()))
+            resolvePeer(retryContext.targetPeer())
+                    .thenCompose(node -> cluster.messagingService()
+                            .invoke(node, retryContext.request(), 
configuration.responseTimeout().value()))
                     .whenComplete((resp, err) -> {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("sendWithRetry req={} resp={} from={} 
to={} err={}",
-                                    request,
+                                    retryContext.request(),
                                     resp,
                                     
cluster.topologyService().localMember().address(),
-                                    peer.consistentId(),
+                                    retryContext.targetPeer().consistentId(),
                                     err == null ? null : err.getMessage());
                         }
 
-                        if (err != null) {
-                            handleThrowable(err, peer, request, 
requestFactory, stopTime, fut, retryCount);
-                        } else if (resp instanceof ErrorResponse) {
-                            handleErrorResponse((ErrorResponse) resp, peer, 
request, requestFactory, stopTime, fut, retryCount);
-                        } else if (resp instanceof SMErrorResponse) {
-                            handleSmErrorResponse((SMErrorResponse) resp, fut);
-                        } else {
-                            leader = peer; // The OK response was received 
from a leader.
-
-                            fut.complete((R) resp);
+                        try {
+                            if (err != null) {
+                                handleThrowable(fut, err, retryContext);
+                            } else if (resp instanceof ErrorResponse) {
+                                handleErrorResponse(fut, (ErrorResponse) resp, 
retryContext);
+                            } else if (resp instanceof SMErrorResponse) {
+                                handleSmErrorResponse(fut, (SMErrorResponse) 
resp, retryContext);
+                            } else {
+                                leader = retryContext.targetPeer(); // The OK 
response was received from a leader.
+
+                                fut.complete((R) resp);
+                            }
+                        } catch (Throwable e) {
+                            fut.completeExceptionally(e);
                         }
                     });
         } finally {
@@ -603,61 +600,46 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
         }
     }
 
-    private void handleThrowable(
-            Throwable err,
-            Peer peer,
-            NetworkMessage sentRequest,
-            Function<Peer, ? extends NetworkMessage> requestFactory,
-            long stopTime,
-            CompletableFuture<? extends NetworkMessage> fut,
-            int retryCount
-    ) {
+    private void handleThrowable(CompletableFuture<? extends NetworkMessage> 
fut, Throwable err, RetryContext retryContext) {
         err = unwrapCause(err);
 
-        if (recoverable(err)) {
-            Peer randomPeer = randomNode(peer);
-
-            if (LOG.isDebugEnabled()) {
-                if (err instanceof TimeoutException) {
-                    LOG.debug(
-                            "Recoverable TimeoutException during the request 
occurred (will be retried on the randomly selected"
-                                    + " node) [request={}, peer={}, 
newPeer={}].",
-                            includeSensitive() ? sentRequest : 
sentRequest.toStringForLightLogging(),
-                            peer,
-                            randomPeer
-                    );
-                } else {
-                    LOG.debug(
-                            "Recoverable error during the request occurred 
(will be retried on the randomly selected node) "
-                                    + "[request={}, peer={}, newPeer={}].",
-                            err,
-                            includeSensitive() ? sentRequest : 
sentRequest.toStringForLightLogging(),
-                            peer,
-                            randomPeer
-                    );
-                }
+        if (!recoverable(err)) {
+            fut.completeExceptionally(err);
+
+            return;
+        }
+
+        Peer randomPeer = randomNode(retryContext);
+
+        if (LOG.isDebugEnabled()) {
+            String msg;
+
+            if (err instanceof TimeoutException) {
+                msg = "Recoverable TimeoutException during the request 
occurred (will be retried on a randomly selected node) "
+                        + "[request={}, peer={}, newPeer={}, traceId={}].";
+            } else {
+                msg = "Recoverable error during the request occurred (will be 
retried on a randomly selected node) "
+                        + "[request={}, peer={}, newPeer={}, traceId={}].";
             }
 
-            scheduleRetry(() -> sendWithRetry(randomPeer, requestFactory, 
stopTime, fut, retryCount + 1));
-        } else {
-            fut.completeExceptionally(err);
+            LOG.debug(
+                    msg,
+                    includeSensitive() ? retryContext.request() : 
retryContext.request().toStringForLightLogging(),
+                    retryContext.targetPeer(),
+                    randomPeer,
+                    retryContext.errorTraceId()
+            );
         }
+
+        scheduleRetry(fut, retryContext.nextAttempt(randomPeer));
     }
 
-    private void handleErrorResponse(
-            ErrorResponse resp,
-            Peer peer,
-            NetworkMessage sentRequest,
-            Function<Peer, ? extends NetworkMessage> requestFactory,
-            long stopTime,
-            CompletableFuture<? extends NetworkMessage> fut,
-            int retryCount
-    ) {
+    private void handleErrorResponse(CompletableFuture<? extends 
NetworkMessage> fut, ErrorResponse resp, RetryContext retryContext) {
         RaftError error = RaftError.forNumber(resp.errorCode());
 
         switch (error) {
             case SUCCESS:
-                leader = peer; // The OK response was received from a leader.
+                leader = retryContext.targetPeer(); // The OK response was 
received from a leader.
 
                 fut.complete(null); // Void response.
 
@@ -665,36 +647,59 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
 
             case EBUSY:
             case EAGAIN:
-                scheduleRetry(() -> sendWithRetry(peer, requestFactory, 
stopTime, fut, retryCount + 1));
+                scheduleRetry(fut, 
retryContext.nextAttempt(retryContext.targetPeer()));
 
                 break;
 
-            case ENOENT:
-                scheduleRetry(() -> {
-                    // If changing peers or requesting a leader and something 
is not found
-                    // probably target peer is doing rebalancing, try another 
peer.
-                    if (sentRequest instanceof GetLeaderRequest || sentRequest 
instanceof ChangePeersAndLearnersAsyncRequest) {
-                        sendWithRetry(randomNode(peer), requestFactory, 
stopTime, fut, retryCount + 1);
-                    } else {
-                        sendWithRetry(peer, requestFactory, stopTime, fut, 
retryCount + 1);
-                    }
-                });
+            // TODO: IGNITE-15706
+            case UNKNOWN:
+            case EINTERNAL:
+            case ENOENT: {
+                NetworkMessage request = retryContext.request();
+
+                Peer newTargetPeer;
+
+                // If changing peers or requesting a leader and something is 
not found
+                // probably target peer is doing rebalancing, try another peer.
+                if (request instanceof GetLeaderRequest || request instanceof 
ChangePeersAndLearnersAsyncRequest) {
+                    newTargetPeer = randomNode(retryContext);
+                } else {
+                    newTargetPeer = retryContext.targetPeer();
+                }
+
+                scheduleRetry(fut, retryContext.nextAttempt(newTargetPeer));
 
                 break;
+            }
+
+            case EHOSTDOWN:
+            case ESHUTDOWN:
+            case ENODESHUTDOWN: {
+                Peer newTargetPeer = randomNode(retryContext);
+
+                scheduleRetry(fut, 
retryContext.nextAttemptForUnavailablePeer(newTargetPeer));
+
+                break;
+            }
+
+            case EPERM: {
+                Peer newTargetPeer;
 
-            case EPERM:
-                // TODO: IGNITE-15706
-            case UNKNOWN:
-            case EINTERNAL:
                 if (resp.leaderId() == null) {
-                    scheduleRetry(() -> sendWithRetry(randomNode(peer), 
requestFactory, stopTime, fut, retryCount + 1));
+                    newTargetPeer = randomNode(retryContext);
                 } else {
-                    leader = parsePeer(resp.leaderId()); // Update a leader.
+                    newTargetPeer = parsePeer(resp.leaderId());
 
-                    scheduleRetry(() -> sendWithRetry(leader, requestFactory, 
stopTime, fut, retryCount + 1));
+                    assert newTargetPeer != null;
+
+                    leader = newTargetPeer;
                 }
 
+                scheduleRetry(fut, retryContext.nextAttempt(newTargetPeer));
+
                 break;
+            }
+
             case EREORDER:
                 fut.completeExceptionally(new SafeTimeReorderException());
 
@@ -707,7 +712,9 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
         }
     }
 
-    private static void handleSmErrorResponse(SMErrorResponse resp, 
CompletableFuture<? extends NetworkMessage> fut) {
+    private static void handleSmErrorResponse(
+            CompletableFuture<? extends NetworkMessage> fut, SMErrorResponse 
resp, RetryContext retryContext
+    ) {
         SMThrowable th = resp.error();
 
         if (th instanceof SMCompactedThrowable) {
@@ -724,15 +731,23 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
                         + "Check if throwable " + 
compactedThrowable.throwableClassName()
                         + " is present in the classpath.");
 
-                fut.completeExceptionally(new 
IgniteException(compactedThrowable.throwableMessage()));
+                fut.completeExceptionally(new IgniteInternalException(
+                        retryContext.errorTraceId(), INTERNAL_ERR, 
compactedThrowable.throwableMessage()
+                ));
             }
         } else if (th instanceof SMFullThrowable) {
             fut.completeExceptionally(((SMFullThrowable) th).throwable());
+        } else {
+            assert false : th;
         }
     }
 
-    private void scheduleRetry(Runnable runnable) {
-        executor.schedule(runnable, configuration.retryDelay().value(), 
TimeUnit.MILLISECONDS);
+    private void scheduleRetry(CompletableFuture<? extends NetworkMessage> 
fut, RetryContext retryContext) {
+        executor.schedule(
+                () -> sendWithRetry(fut, retryContext),
+                configuration.retryDelay().value(),
+                TimeUnit.MILLISECONDS
+        );
     }
 
     /**
@@ -755,46 +770,50 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
     }
 
     /**
-     * Returns a random peer. Tries 5 times finding a peer different from the 
excluded peer. If excluded peer is null, just returns a random
-     * peer.
+     * Returns a random peer.
      *
-     * @param excludedPeer Excluded peer.
-     * @return Random peer.
+     * <p>If the {@code retryContext} is not {@code null}, the random peer 
will be chosen as to not to match
+     * {@link RetryContext#targetPeer()} and {@link 
RetryContext#unavailablePeers()}.
      */
-    private Peer randomNode(@Nullable Peer excludedPeer) {
-        List<Peer> peers0 = peers;
+    private Peer randomNode(@Nullable RetryContext retryContext) {
+        List<Peer> localPeers = peers;
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-19466
-        // assert peers0 != null && !peers0.isEmpty();
-        if (peers0 == null || peers0.isEmpty()) {
-            throw new IgniteInternalException(INTERNAL_ERR, "Peers are not 
ready [groupId=" + groupId + ']');
-        }
-
-        if (peers0.size() == 1) {
-            return peers0.get(0);
-        }
-
-        int lastPeerIndex = excludedPeer == null ? -1 : 
peers0.indexOf(excludedPeer);
+        var availablePeers = new ArrayList<Peer>(localPeers.size());
 
-        ThreadLocalRandom random = current();
-
-        int newIdx = 0;
-
-        for (int retries = 0; retries < 5; retries++) {
-            newIdx = random.nextInt(peers0.size());
+        if (retryContext == null) {
+            availablePeers.addAll(localPeers);
+        } else {
+            for (Peer peer : localPeers) {
+                if (!retryContext.targetPeer().equals(peer) && 
!retryContext.unavailablePeers().contains(peer)) {
+                    availablePeers.add(peer);
+                }
+            }
 
-            if (newIdx != lastPeerIndex) {
-                Peer peer = peers0.get(newIdx);
+            if (availablePeers.isEmpty()) {
+                LOG.warn(
+                        "All peers are unavailable, going to keep retrying 
until timeout [peers = {}, trace ID: {}].",
+                        localPeers, retryContext.errorTraceId()
+                );
 
-                assert peer != null : "idx=" + newIdx + ", peers=" + peers0;
+                retryContext.resetUnavailablePeers();
 
-                if 
(cluster.topologyService().getByConsistentId(peer.consistentId()) != null) {
-                    break;
-                }
+                // Read the volatile field again, just in case it changed.
+                availablePeers.addAll(peers);
             }
         }
 
-        return peers0.get(newIdx);
+        // TODO https://issues.apache.org/jira/browse/IGNITE-19466
+        // assert !availablePeers.isEmpty();
+        if (availablePeers.isEmpty()) {
+            throw new IgniteInternalException(INTERNAL_ERR, "No peers 
available [groupId=" + groupId + ']');
+        }
+
+        Collections.shuffle(availablePeers, ThreadLocalRandom.current());
+
+        return availablePeers.stream()
+                .filter(peer -> 
cluster.topologyService().getByConsistentId(peer.consistentId()) != null)
+                .findAny()
+                .orElse(availablePeers.get(0));
     }
 
     /**
@@ -815,9 +834,9 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
      * @param peers List of {@link PeerId} string representations.
      * @return List of {@link PeerId}
      */
-    private static @Nullable List<Peer> parsePeerList(@Nullable 
Collection<String> peers) {
+    private static List<Peer> parsePeerList(@Nullable Collection<String> 
peers) {
         if (peers == null) {
-            return null;
+            return List.of();
         }
 
         List<Peer> res = new ArrayList<>(peers.size());
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java
new file mode 100644
index 0000000000..2a7c6aade1
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a context containing data for {@code 
RaftGroupServiceImpl#sendWithRetry} methods.
+ *
+ * <p>Not thread-safe. It is expected that every context is confined within a 
single {@code sendWithRetry} chain and, therefore,
+ * happens-before relationship (i.e. visibility of changes to the mutable 
state) is achieved through consecutive {@code Executor.submit}
+ * calls.
+ */
+class RetryContext {
+    private Peer targetPeer;
+
+    private final Function<Peer, ? extends NetworkMessage> requestFactory;
+
+    /**
+     * Request that will be sent to the target peer.
+     */
+    private NetworkMessage request;
+
+    /**
+     * Timestamp that denotes the point in time up to which retry attempts 
will be made.
+     */
+    private final long stopTime;
+
+    /**
+     * Number of retries made. sendWithRetry method has a recursion nature, in 
case of recoverable exceptions or peer
+     * unavailability it'll be scheduled for a next attempt. Generally a 
request will be retried until success or timeout.
+     */
+    private int retryCount = 0;
+
+    /**
+     * Set of peers that should be excluded when choosing a node to send a 
request to.
+     */
+    private final Set<Peer> unavailablePeers = new HashSet<>();
+
+    /**
+     * Trace ID that is used to track exceptions that happened during a 
particular chain of retries.
+     *
+     * <p>Will be generated on first access.
+     */
+    @Nullable
+    private UUID errorTraceId;
+
+    /**
+     * Creates a context.
+     *
+     * @param targetPeer Target peer to send the request to.
+     * @param requestFactory Factory for creating requests to the target peer.
+     * @param stopTime Timestamp that denotes the point in time up to which 
retry attempts will be made.
+     */
+    RetryContext(Peer targetPeer, Function<Peer, ? extends NetworkMessage> 
requestFactory, long stopTime) {
+        this.targetPeer = targetPeer;
+        this.requestFactory = requestFactory;
+        this.request = requestFactory.apply(targetPeer);
+        this.stopTime = stopTime;
+    }
+
+    Peer targetPeer() {
+        return targetPeer;
+    }
+
+    NetworkMessage request() {
+        return request;
+    }
+
+    long stopTime() {
+        return stopTime;
+    }
+
+    int retryCount() {
+        return retryCount;
+    }
+
+    Set<Peer> unavailablePeers() {
+        return unavailablePeers;
+    }
+
+    UUID errorTraceId() {
+        if (errorTraceId == null) {
+            errorTraceId = UUID.randomUUID();
+        }
+
+        return errorTraceId;
+    }
+
+    void resetUnavailablePeers() {
+        unavailablePeers.clear();
+    }
+
+    /**
+     * Updates this context by changing the target peer.
+     *
+     * @return {@code this}.
+     */
+    RetryContext nextAttempt(Peer newTargetPeer) {
+        // We can avoid recreating the request if the target peer has not 
changed.
+        if (!newTargetPeer.equals(targetPeer)) {
+            request = requestFactory.apply(newTargetPeer);
+        }
+
+        targetPeer = newTargetPeer;
+
+        retryCount++;
+
+        return this;
+    }
+
+    /**
+     * Updates this context by changing the target peer and adding the 
previous target peer to the "unavailable set".
+     *
+     * @return {@code this}.
+     */
+    RetryContext nextAttemptForUnavailablePeer(Peer newTargetPeer) {
+        unavailablePeers.add(targetPeer);
+
+        return nextAttempt(newTargetPeer);
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index 863c74b4e9..96ff82c7d2 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -35,6 +35,7 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -42,6 +43,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.net.ConnectException;
@@ -92,6 +96,7 @@ import 
org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
 import org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
 import org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
 import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.ReadActionRequest;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
 import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
@@ -101,6 +106,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -623,6 +630,75 @@ public class RaftGroupServiceTest extends 
BaseIgniteAbstractTest {
         assertThat(fut, willThrowFast(TimeoutException.class));
     }
 
+    @ParameterizedTest
+    @EnumSource(names = {"ESHUTDOWN", "EHOSTDOWN", "ENODESHUTDOWN"})
+    public void testRetryOnErrorWithUnavailablePeers(RaftError error) {
+        when(messagingService.invoke(any(ClusterNode.class), 
any(ReadActionRequest.class), anyLong()))
+                .thenReturn(completedFuture(FACTORY.errorResponse()
+                        .errorCode(error.getNumber())
+                        .build()));
+
+        RaftGroupService service = 
startRaftGroupServiceWithRefreshLeader(NODES);
+
+        CompletableFuture<Object> response = 
service.run(mock(ReadCommand.class));
+
+        assertThat(response, willThrow(TimeoutException.class, "Send with 
retry timed out"));
+
+        // Verify that we tried to send the message to every node.
+        NODES.forEach(node -> verify(messagingService, atLeastOnce()).invoke(
+                argThat((ClusterNode target) -> target != null && 
target.name().equals(node.consistentId())),
+                any(ReadActionRequest.class),
+                anyLong()
+        ));
+    }
+
+    @ParameterizedTest
+    @EnumSource(names = {"UNKNOWN", "EINTERNAL", "ENOENT"})
+    public void testRetryOnErrorWithTimeout(RaftError error) {
+        when(messagingService.invoke(any(ClusterNode.class), 
any(ReadActionRequest.class), anyLong()))
+                .thenReturn(completedFuture(FACTORY.errorResponse()
+                        .errorCode(error.getNumber())
+                        .build()));
+
+        RaftGroupService service = 
startRaftGroupServiceWithRefreshLeader(NODES);
+
+        CompletableFuture<Object> response = 
service.run(mock(ReadCommand.class));
+
+        assertThat(response, willThrow(TimeoutException.class, "Send with 
retry timed out"));
+    }
+
+    @ParameterizedTest
+    @EnumSource(names = {"EPERM"})
+    public void testRetryOnErrorWithUpdateLeader(RaftError error) {
+        when(messagingService.invoke(
+                argThat((ClusterNode node) -> node != null && 
node.name().equals(NODES.get(0).consistentId())),
+                any(ReadActionRequest.class),
+                anyLong())
+        )
+                .thenReturn(completedFuture(FACTORY.errorResponse()
+                        .errorCode(error.getNumber())
+                        .leaderId(NODES.get(NODES.size() - 1).consistentId())
+                        .build()));
+
+        when(messagingService.invoke(
+                argThat((ClusterNode node) -> node != null && 
node.name().equals(NODES.get(NODES.size() - 1).consistentId())),
+                any(ReadActionRequest.class),
+                anyLong())
+        )
+                
.thenReturn(completedFuture(FACTORY.actionResponse().result(null).build()));
+
+        RaftGroupService service = 
startRaftGroupServiceWithRefreshLeader(NODES);
+
+        assertThat(service.leader(), is(NODES.get(0)));
+
+        CompletableFuture<Object> response = 
service.run(mock(ReadCommand.class));
+
+        assertThat(response, willBe(nullValue()));
+
+        // Check that the leader was updated as well.
+        assertThat(service.leader(), is(NODES.get(NODES.size() - 1)));
+    }
+
     private RaftGroupService startRaftGroupService(List<Peer> peers) {
         PeersAndLearners memberConfiguration = 
PeersAndLearners.fromPeers(peers, Set.of());
 

Reply via email to