This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a6db008760 IGNITE-23404 Properly handle EHOSTDOWN error in
RaftGroupServiceImpl (#4543)
a6db008760 is described below
commit a6db008760a73536015567bca6e93647616d5ba4
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Oct 16 14:30:01 2024 +0300
IGNITE-23404 Properly handle EHOSTDOWN error in RaftGroupServiceImpl (#4543)
---
.../ignite/internal/raft/RaftGroupServiceImpl.java | 289 +++++++++++----------
.../apache/ignite/internal/raft/RetryContext.java | 142 ++++++++++
.../ignite/internal/raft/RaftGroupServiceTest.java | 76 ++++++
3 files changed, 372 insertions(+), 135 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 04fc475234..be6578b27c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.raft;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.ThreadLocalRandom.current;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.tostring.IgniteToStringBuilder.includeSensitive;
@@ -47,6 +46,7 @@ import static
org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -67,7 +67,6 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.entity.PeerId;
@@ -89,7 +88,7 @@ import org.jetbrains.annotations.Nullable;
* The implementation of {@link RaftGroupService}.
*/
// TODO: IGNITE-20738 Methods
updateConfiguration/refreshMembers/*Peer/*Learner are not thread-safe
-// and can produce meaningless (peers, learners) pairs as a result.
+// and can produce meaningless (peers, learners) pairs as a result.
public class RaftGroupServiceImpl implements RaftGroupService {
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(RaftGroupServiceImpl.class);
@@ -534,7 +533,9 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
) {
var future = new CompletableFuture<R>();
- sendWithRetry(peer, requestFactory, currentTimeMillis() +
configuration.retryTimeout().value(), future, 1);
+ var context = new RetryContext(peer, requestFactory,
currentTimeMillis() + configuration.retryTimeout().value());
+
+ sendWithRetry(future, context);
return future;
}
@@ -542,22 +543,11 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
/**
* Retries a request until success or timeout.
*
- * @param peer Initial target peer, request can be sent to a random peer
if the target peer is unavailable.
- * @param requestFactory Factory for creating requests to the target peer.
- * @param stopTime Stop time.
- * @param fut The future.
+ * @param fut Result future.
+ * @param retryContext Context.
* @param <R> Response type.
- * @param retryCount Number of retries made. sendWithRetry method has a
recursion nature, in case of recoverable exceptions or peer
- * unavailability it'll be scheduled for a next attempt. Generally a
request will be retried until success or timeout.
*/
- private <R extends NetworkMessage> void sendWithRetry(
- Peer peer,
- Function<Peer, ? extends NetworkMessage> requestFactory,
- long stopTime,
- CompletableFuture<R> fut,
- int retryCount
-
- ) {
+ private <R extends NetworkMessage> void sendWithRetry(CompletableFuture<R>
fut, RetryContext retryContext) {
if (!busyLock.enterBusy()) {
fut.cancel(true);
@@ -565,37 +555,44 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
}
try {
- if (currentTimeMillis() >= stopTime) {
- fut.completeExceptionally(
- new TimeoutException(format("Send with retry timed out
[retryCount = {}, groupId = {}].", retryCount, groupId)));
+ if (currentTimeMillis() >= retryContext.stopTime()) {
+ fut.completeExceptionally(new TimeoutException(format(
+ "Send with retry timed out [retryCount = {}, groupId =
{}, traceId = {}].",
+ retryContext.retryCount(),
+ groupId,
+ retryContext.errorTraceId()
+ )));
return;
}
- NetworkMessage request = requestFactory.apply(peer);
-
- resolvePeer(peer)
- .thenCompose(node ->
cluster.messagingService().invoke(node, request,
configuration.responseTimeout().value()))
+ resolvePeer(retryContext.targetPeer())
+ .thenCompose(node -> cluster.messagingService()
+ .invoke(node, retryContext.request(),
configuration.responseTimeout().value()))
.whenComplete((resp, err) -> {
if (LOG.isTraceEnabled()) {
LOG.trace("sendWithRetry req={} resp={} from={}
to={} err={}",
- request,
+ retryContext.request(),
resp,
cluster.topologyService().localMember().address(),
- peer.consistentId(),
+ retryContext.targetPeer().consistentId(),
err == null ? null : err.getMessage());
}
- if (err != null) {
- handleThrowable(err, peer, request,
requestFactory, stopTime, fut, retryCount);
- } else if (resp instanceof ErrorResponse) {
- handleErrorResponse((ErrorResponse) resp, peer,
request, requestFactory, stopTime, fut, retryCount);
- } else if (resp instanceof SMErrorResponse) {
- handleSmErrorResponse((SMErrorResponse) resp, fut);
- } else {
- leader = peer; // The OK response was received
from a leader.
-
- fut.complete((R) resp);
+ try {
+ if (err != null) {
+ handleThrowable(fut, err, retryContext);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponse(fut, (ErrorResponse) resp,
retryContext);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse(fut, (SMErrorResponse)
resp, retryContext);
+ } else {
+ leader = retryContext.targetPeer(); // The OK
response was received from a leader.
+
+ fut.complete((R) resp);
+ }
+ } catch (Throwable e) {
+ fut.completeExceptionally(e);
}
});
} finally {
@@ -603,61 +600,46 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
}
}
- private void handleThrowable(
- Throwable err,
- Peer peer,
- NetworkMessage sentRequest,
- Function<Peer, ? extends NetworkMessage> requestFactory,
- long stopTime,
- CompletableFuture<? extends NetworkMessage> fut,
- int retryCount
- ) {
+ private void handleThrowable(CompletableFuture<? extends NetworkMessage>
fut, Throwable err, RetryContext retryContext) {
err = unwrapCause(err);
- if (recoverable(err)) {
- Peer randomPeer = randomNode(peer);
-
- if (LOG.isDebugEnabled()) {
- if (err instanceof TimeoutException) {
- LOG.debug(
- "Recoverable TimeoutException during the request
occurred (will be retried on the randomly selected"
- + " node) [request={}, peer={},
newPeer={}].",
- includeSensitive() ? sentRequest :
sentRequest.toStringForLightLogging(),
- peer,
- randomPeer
- );
- } else {
- LOG.debug(
- "Recoverable error during the request occurred
(will be retried on the randomly selected node) "
- + "[request={}, peer={}, newPeer={}].",
- err,
- includeSensitive() ? sentRequest :
sentRequest.toStringForLightLogging(),
- peer,
- randomPeer
- );
- }
+ if (!recoverable(err)) {
+ fut.completeExceptionally(err);
+
+ return;
+ }
+
+ Peer randomPeer = randomNode(retryContext);
+
+ if (LOG.isDebugEnabled()) {
+ String msg;
+
+ if (err instanceof TimeoutException) {
+ msg = "Recoverable TimeoutException during the request
occurred (will be retried on a randomly selected node) "
+ + "[request={}, peer={}, newPeer={}, traceId={}].";
+ } else {
+ msg = "Recoverable error during the request occurred (will be
retried on a randomly selected node) "
+ + "[request={}, peer={}, newPeer={}, traceId={}].";
}
- scheduleRetry(() -> sendWithRetry(randomPeer, requestFactory,
stopTime, fut, retryCount + 1));
- } else {
- fut.completeExceptionally(err);
+ LOG.debug(
+ msg,
+ includeSensitive() ? retryContext.request() :
retryContext.request().toStringForLightLogging(),
+ retryContext.targetPeer(),
+ randomPeer,
+ retryContext.errorTraceId()
+ );
}
+
+ scheduleRetry(fut, retryContext.nextAttempt(randomPeer));
}
- private void handleErrorResponse(
- ErrorResponse resp,
- Peer peer,
- NetworkMessage sentRequest,
- Function<Peer, ? extends NetworkMessage> requestFactory,
- long stopTime,
- CompletableFuture<? extends NetworkMessage> fut,
- int retryCount
- ) {
+ private void handleErrorResponse(CompletableFuture<? extends
NetworkMessage> fut, ErrorResponse resp, RetryContext retryContext) {
RaftError error = RaftError.forNumber(resp.errorCode());
switch (error) {
case SUCCESS:
- leader = peer; // The OK response was received from a leader.
+ leader = retryContext.targetPeer(); // The OK response was
received from a leader.
fut.complete(null); // Void response.
@@ -665,36 +647,59 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
case EBUSY:
case EAGAIN:
- scheduleRetry(() -> sendWithRetry(peer, requestFactory,
stopTime, fut, retryCount + 1));
+ scheduleRetry(fut,
retryContext.nextAttempt(retryContext.targetPeer()));
break;
- case ENOENT:
- scheduleRetry(() -> {
- // If changing peers or requesting a leader and something
is not found
- // probably target peer is doing rebalancing, try another
peer.
- if (sentRequest instanceof GetLeaderRequest || sentRequest
instanceof ChangePeersAndLearnersAsyncRequest) {
- sendWithRetry(randomNode(peer), requestFactory,
stopTime, fut, retryCount + 1);
- } else {
- sendWithRetry(peer, requestFactory, stopTime, fut,
retryCount + 1);
- }
- });
+ // TODO: IGNITE-15706
+ case UNKNOWN:
+ case EINTERNAL:
+ case ENOENT: {
+ NetworkMessage request = retryContext.request();
+
+ Peer newTargetPeer;
+
+ // If changing peers or requesting a leader and something is
not found
+ // probably target peer is doing rebalancing, try another peer.
+ if (request instanceof GetLeaderRequest || request instanceof
ChangePeersAndLearnersAsyncRequest) {
+ newTargetPeer = randomNode(retryContext);
+ } else {
+ newTargetPeer = retryContext.targetPeer();
+ }
+
+ scheduleRetry(fut, retryContext.nextAttempt(newTargetPeer));
break;
+ }
+
+ case EHOSTDOWN:
+ case ESHUTDOWN:
+ case ENODESHUTDOWN: {
+ Peer newTargetPeer = randomNode(retryContext);
+
+ scheduleRetry(fut,
retryContext.nextAttemptForUnavailablePeer(newTargetPeer));
+
+ break;
+ }
+
+ case EPERM: {
+ Peer newTargetPeer;
- case EPERM:
- // TODO: IGNITE-15706
- case UNKNOWN:
- case EINTERNAL:
if (resp.leaderId() == null) {
- scheduleRetry(() -> sendWithRetry(randomNode(peer),
requestFactory, stopTime, fut, retryCount + 1));
+ newTargetPeer = randomNode(retryContext);
} else {
- leader = parsePeer(resp.leaderId()); // Update a leader.
+ newTargetPeer = parsePeer(resp.leaderId());
- scheduleRetry(() -> sendWithRetry(leader, requestFactory,
stopTime, fut, retryCount + 1));
+ assert newTargetPeer != null;
+
+ leader = newTargetPeer;
}
+ scheduleRetry(fut, retryContext.nextAttempt(newTargetPeer));
+
break;
+ }
+
case EREORDER:
fut.completeExceptionally(new SafeTimeReorderException());
@@ -707,7 +712,9 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
}
}
- private static void handleSmErrorResponse(SMErrorResponse resp,
CompletableFuture<? extends NetworkMessage> fut) {
+ private static void handleSmErrorResponse(
+ CompletableFuture<? extends NetworkMessage> fut, SMErrorResponse
resp, RetryContext retryContext
+ ) {
SMThrowable th = resp.error();
if (th instanceof SMCompactedThrowable) {
@@ -724,15 +731,23 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
+ "Check if throwable " +
compactedThrowable.throwableClassName()
+ " is present in the classpath.");
- fut.completeExceptionally(new
IgniteException(compactedThrowable.throwableMessage()));
+ fut.completeExceptionally(new IgniteInternalException(
+ retryContext.errorTraceId(), INTERNAL_ERR,
compactedThrowable.throwableMessage()
+ ));
}
} else if (th instanceof SMFullThrowable) {
fut.completeExceptionally(((SMFullThrowable) th).throwable());
+ } else {
+ assert false : th;
}
}
- private void scheduleRetry(Runnable runnable) {
- executor.schedule(runnable, configuration.retryDelay().value(),
TimeUnit.MILLISECONDS);
+ private void scheduleRetry(CompletableFuture<? extends NetworkMessage>
fut, RetryContext retryContext) {
+ executor.schedule(
+ () -> sendWithRetry(fut, retryContext),
+ configuration.retryDelay().value(),
+ TimeUnit.MILLISECONDS
+ );
}
/**
@@ -755,46 +770,50 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
}
/**
- * Returns a random peer. Tries 5 times finding a peer different from the
excluded peer. If excluded peer is null, just returns a random
- * peer.
+ * Returns a random peer.
*
- * @param excludedPeer Excluded peer.
- * @return Random peer.
+ * <p>If the {@code retryContext} is not {@code null}, the random peer
will be chosen as to not to match
+ * {@link RetryContext#targetPeer()} and {@link
RetryContext#unavailablePeers()}.
*/
- private Peer randomNode(@Nullable Peer excludedPeer) {
- List<Peer> peers0 = peers;
+ private Peer randomNode(@Nullable RetryContext retryContext) {
+ List<Peer> localPeers = peers;
- // TODO https://issues.apache.org/jira/browse/IGNITE-19466
- // assert peers0 != null && !peers0.isEmpty();
- if (peers0 == null || peers0.isEmpty()) {
- throw new IgniteInternalException(INTERNAL_ERR, "Peers are not
ready [groupId=" + groupId + ']');
- }
-
- if (peers0.size() == 1) {
- return peers0.get(0);
- }
-
- int lastPeerIndex = excludedPeer == null ? -1 :
peers0.indexOf(excludedPeer);
+ var availablePeers = new ArrayList<Peer>(localPeers.size());
- ThreadLocalRandom random = current();
-
- int newIdx = 0;
-
- for (int retries = 0; retries < 5; retries++) {
- newIdx = random.nextInt(peers0.size());
+ if (retryContext == null) {
+ availablePeers.addAll(localPeers);
+ } else {
+ for (Peer peer : localPeers) {
+ if (!retryContext.targetPeer().equals(peer) &&
!retryContext.unavailablePeers().contains(peer)) {
+ availablePeers.add(peer);
+ }
+ }
- if (newIdx != lastPeerIndex) {
- Peer peer = peers0.get(newIdx);
+ if (availablePeers.isEmpty()) {
+ LOG.warn(
+ "All peers are unavailable, going to keep retrying
until timeout [peers = {}, trace ID: {}].",
+ localPeers, retryContext.errorTraceId()
+ );
- assert peer != null : "idx=" + newIdx + ", peers=" + peers0;
+ retryContext.resetUnavailablePeers();
- if
(cluster.topologyService().getByConsistentId(peer.consistentId()) != null) {
- break;
- }
+ // Read the volatile field again, just in case it changed.
+ availablePeers.addAll(peers);
}
}
- return peers0.get(newIdx);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-19466
+ // assert !availablePeers.isEmpty();
+ if (availablePeers.isEmpty()) {
+ throw new IgniteInternalException(INTERNAL_ERR, "No peers
available [groupId=" + groupId + ']');
+ }
+
+ Collections.shuffle(availablePeers, ThreadLocalRandom.current());
+
+ return availablePeers.stream()
+ .filter(peer ->
cluster.topologyService().getByConsistentId(peer.consistentId()) != null)
+ .findAny()
+ .orElse(availablePeers.get(0));
}
/**
@@ -815,9 +834,9 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
* @param peers List of {@link PeerId} string representations.
* @return List of {@link PeerId}
*/
- private static @Nullable List<Peer> parsePeerList(@Nullable
Collection<String> peers) {
+ private static List<Peer> parsePeerList(@Nullable Collection<String>
peers) {
if (peers == null) {
- return null;
+ return List.of();
}
List<Peer> res = new ArrayList<>(peers.size());
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java
new file mode 100644
index 0000000000..2a7c6aade1
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RetryContext.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a context containing data for {@code
RaftGroupServiceImpl#sendWithRetry} methods.
+ *
+ * <p>Not thread-safe. It is expected that every context is confined within a
single {@code sendWithRetry} chain and, therefore,
+ * happens-before relationship (i.e. visibility of changes to the mutable
state) is achieved through consecutive {@code Executor.submit}
+ * calls.
+ */
+class RetryContext {
+ private Peer targetPeer;
+
+ private final Function<Peer, ? extends NetworkMessage> requestFactory;
+
+ /**
+ * Request that will be sent to the target peer.
+ */
+ private NetworkMessage request;
+
+ /**
+ * Timestamp that denotes the point in time up to which retry attempts
will be made.
+ */
+ private final long stopTime;
+
+ /**
+ * Number of retries made. sendWithRetry method has a recursion nature, in
case of recoverable exceptions or peer
+ * unavailability it'll be scheduled for a next attempt. Generally a
request will be retried until success or timeout.
+ */
+ private int retryCount = 0;
+
+ /**
+ * Set of peers that should be excluded when choosing a node to send a
request to.
+ */
+ private final Set<Peer> unavailablePeers = new HashSet<>();
+
+ /**
+ * Trace ID that is used to track exceptions that happened during a
particular chain of retries.
+ *
+ * <p>Will be generated on first access.
+ */
+ @Nullable
+ private UUID errorTraceId;
+
+ /**
+ * Creates a context.
+ *
+ * @param targetPeer Target peer to send the request to.
+ * @param requestFactory Factory for creating requests to the target peer.
+ * @param stopTime Timestamp that denotes the point in time up to which
retry attempts will be made.
+ */
+ RetryContext(Peer targetPeer, Function<Peer, ? extends NetworkMessage>
requestFactory, long stopTime) {
+ this.targetPeer = targetPeer;
+ this.requestFactory = requestFactory;
+ this.request = requestFactory.apply(targetPeer);
+ this.stopTime = stopTime;
+ }
+
+ Peer targetPeer() {
+ return targetPeer;
+ }
+
+ NetworkMessage request() {
+ return request;
+ }
+
+ long stopTime() {
+ return stopTime;
+ }
+
+ int retryCount() {
+ return retryCount;
+ }
+
+ Set<Peer> unavailablePeers() {
+ return unavailablePeers;
+ }
+
+ UUID errorTraceId() {
+ if (errorTraceId == null) {
+ errorTraceId = UUID.randomUUID();
+ }
+
+ return errorTraceId;
+ }
+
+ void resetUnavailablePeers() {
+ unavailablePeers.clear();
+ }
+
+ /**
+ * Updates this context by changing the target peer.
+ *
+ * @return {@code this}.
+ */
+ RetryContext nextAttempt(Peer newTargetPeer) {
+ // We can avoid recreating the request if the target peer has not
changed.
+ if (!newTargetPeer.equals(targetPeer)) {
+ request = requestFactory.apply(newTargetPeer);
+ }
+
+ targetPeer = newTargetPeer;
+
+ retryCount++;
+
+ return this;
+ }
+
+ /**
+ * Updates this context by changing the target peer and adding the
previous target peer to the "unavailable set".
+ *
+ * @return {@code this}.
+ */
+ RetryContext nextAttemptForUnavailablePeer(Peer newTargetPeer) {
+ unavailablePeers.add(targetPeer);
+
+ return nextAttempt(newTargetPeer);
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index 863c74b4e9..96ff82c7d2 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -35,6 +35,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -42,6 +43,9 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.net.ConnectException;
@@ -92,6 +96,7 @@ import
org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
import org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.ReadActionRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
@@ -101,6 +106,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -623,6 +630,75 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
assertThat(fut, willThrowFast(TimeoutException.class));
}
+ @ParameterizedTest
+ @EnumSource(names = {"ESHUTDOWN", "EHOSTDOWN", "ENODESHUTDOWN"})
+ public void testRetryOnErrorWithUnavailablePeers(RaftError error) {
+ when(messagingService.invoke(any(ClusterNode.class),
any(ReadActionRequest.class), anyLong()))
+ .thenReturn(completedFuture(FACTORY.errorResponse()
+ .errorCode(error.getNumber())
+ .build()));
+
+ RaftGroupService service =
startRaftGroupServiceWithRefreshLeader(NODES);
+
+ CompletableFuture<Object> response =
service.run(mock(ReadCommand.class));
+
+ assertThat(response, willThrow(TimeoutException.class, "Send with
retry timed out"));
+
+ // Verify that we tried to send the message to every node.
+ NODES.forEach(node -> verify(messagingService, atLeastOnce()).invoke(
+ argThat((ClusterNode target) -> target != null &&
target.name().equals(node.consistentId())),
+ any(ReadActionRequest.class),
+ anyLong()
+ ));
+ }
+
+ @ParameterizedTest
+ @EnumSource(names = {"UNKNOWN", "EINTERNAL", "ENOENT"})
+ public void testRetryOnErrorWithTimeout(RaftError error) {
+ when(messagingService.invoke(any(ClusterNode.class),
any(ReadActionRequest.class), anyLong()))
+ .thenReturn(completedFuture(FACTORY.errorResponse()
+ .errorCode(error.getNumber())
+ .build()));
+
+ RaftGroupService service =
startRaftGroupServiceWithRefreshLeader(NODES);
+
+ CompletableFuture<Object> response =
service.run(mock(ReadCommand.class));
+
+ assertThat(response, willThrow(TimeoutException.class, "Send with
retry timed out"));
+ }
+
+ @ParameterizedTest
+ @EnumSource(names = {"EPERM"})
+ public void testRetryOnErrorWithUpdateLeader(RaftError error) {
+ when(messagingService.invoke(
+ argThat((ClusterNode node) -> node != null &&
node.name().equals(NODES.get(0).consistentId())),
+ any(ReadActionRequest.class),
+ anyLong())
+ )
+ .thenReturn(completedFuture(FACTORY.errorResponse()
+ .errorCode(error.getNumber())
+ .leaderId(NODES.get(NODES.size() - 1).consistentId())
+ .build()));
+
+ when(messagingService.invoke(
+ argThat((ClusterNode node) -> node != null &&
node.name().equals(NODES.get(NODES.size() - 1).consistentId())),
+ any(ReadActionRequest.class),
+ anyLong())
+ )
+
.thenReturn(completedFuture(FACTORY.actionResponse().result(null).build()));
+
+ RaftGroupService service =
startRaftGroupServiceWithRefreshLeader(NODES);
+
+ assertThat(service.leader(), is(NODES.get(0)));
+
+ CompletableFuture<Object> response =
service.run(mock(ReadCommand.class));
+
+ assertThat(response, willBe(nullValue()));
+
+ // Check that the leader was updated as well.
+ assertThat(service.leader(), is(NODES.get(NODES.size() - 1)));
+ }
+
private RaftGroupService startRaftGroupService(List<Peer> peers) {
PeersAndLearners memberConfiguration =
PeersAndLearners.fromPeers(peers, Set.of());