This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4f24f16c94 IGNITE-17957 Move and refactor RaftGroupServiceImpl (#1240)
4f24f16c94 is described below
commit 4f24f16c94a4360cf6c38ac0425e2b00a92a4d6e
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Nov 9 16:54:25 2022 +0300
IGNITE-17957 Move and refactor RaftGroupServiceImpl (#1240)
---
.../CompletableFutureExceptionMatcher.java | 30 +-
.../client/ItMetaStorageRaftGroupTest.java | 2 +-
.../client/ItMetaStorageServiceTest.java | 2 +-
.../raft/server/ItSimpleCounterServerTest.java | 2 +-
.../ignite/raft/server/JraftAbstractTest.java | 2 +-
.../java/org/apache/ignite/internal/raft/Loza.java | 1 -
.../ignite/internal/raft/RaftGroupServiceImpl.java | 711 +++++++++++++++++
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 769 ------------------
.../ignite/internal/raft/RaftGroupServiceTest.java | 735 +++++++++++++++++
.../raft/jraft/core/RaftGroupServiceTest.java | 883 ---------------------
.../service/ItAbstractListenerSnapshotTest.java | 2 +-
.../distributed/ItTxDistributedTestSingleNode.java | 2 +-
.../table/distributed/TableManagerTest.java | 2 +-
13 files changed, 1478 insertions(+), 1665 deletions(-)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java
index 16df6f740d..4d09f514b2 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureExceptionMatcher.java
@@ -33,24 +33,30 @@ import org.hamcrest.TypeSafeMatcher;
*/
public class CompletableFutureExceptionMatcher extends
TypeSafeMatcher<CompletableFuture<?>> {
/** Timeout in seconds. */
- private static final int TIMEOUT_SECONDS = 30;
+ private static final int TIMEOUT_SECONDS = 1;
/** Matcher to forward the exception of the completable future. */
private final Matcher<? extends Exception> matcher;
+ private final int timeout;
+
+ private final TimeUnit timeUnit;
+
/**
* Constructor.
*
* @param matcher Matcher to forward the exception of the completable
future.
*/
- private CompletableFutureExceptionMatcher(Matcher<? extends Exception>
matcher) {
+ private CompletableFutureExceptionMatcher(Matcher<? extends Exception>
matcher, int timeout, TimeUnit timeUnit) {
this.matcher = matcher;
+ this.timeout = timeout;
+ this.timeUnit = timeUnit;
}
@Override
protected boolean matchesSafely(CompletableFuture<?> item) {
try {
- item.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ item.get(timeout, timeUnit);
return false;
} catch (Exception e) {
@@ -90,13 +96,27 @@ public class CompletableFutureExceptionMatcher extends
TypeSafeMatcher<Completab
* Creates a matcher that matches a future that completes exceptionally
and the exception matches the nested matcher.
*/
public static CompletableFutureExceptionMatcher willThrow(Matcher<?
extends Exception> matcher) {
- return new CompletableFutureExceptionMatcher(matcher);
+ return new CompletableFutureExceptionMatcher(matcher, TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ }
+
+ /**
+ * Creates a matcher that matches a future that completes exceptionally
and the exception matches the nested matcher.
+ */
+ public static CompletableFutureExceptionMatcher willThrow(Matcher<?
extends Exception> matcher, int timeout, TimeUnit timeUnit) {
+ return new CompletableFutureExceptionMatcher(matcher, timeout,
timeUnit);
}
/**
* Creates a matcher that matches a future that completes with an
exception of the provided type.
*/
public static CompletableFutureExceptionMatcher willThrow(Class<? extends
Exception> cls) {
- return willThrow(is(instanceOf(cls)));
+ return willThrow(cls, TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Creates a matcher that matches a future that completes with an
exception of the provided type.
+ */
+ public static CompletableFutureExceptionMatcher willThrow(Class<? extends
Exception> cls, int timeout, TimeUnit timeUnit) {
+ return willThrow(is(instanceOf(cls)), timeout, timeUnit);
}
}
diff --git
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
index d68e124edb..ff3fe90100 100644
---
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
+++
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -61,7 +62,6 @@ import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.core.Replicator;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.NodeOptions;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
diff --git
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index a330eded5a..20d563a26e 100644
---
a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++
b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -80,6 +80,7 @@ import
org.apache.ignite.internal.metastorage.server.ValueCondition;
import org.apache.ignite.internal.metastorage.server.ValueCondition.Type;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -95,7 +96,6 @@ import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
index b42a0aa9cf..9cbe3fcf88 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -41,7 +42,6 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.server.counter.CounterListener;
import org.apache.ignite.raft.server.counter.GetValueCommand;
import org.apache.ignite.raft.server.counter.IncrementAndGetCommand;
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
index f6ca5bbf5f..e4f05a40bd 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java
@@ -35,6 +35,7 @@ import java.util.stream.IntStream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -46,7 +47,6 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.option.NodeOptions;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 74f2d372d9..09163bc674 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -58,7 +58,6 @@ import
org.apache.ignite.raft.client.service.RaftGroupListener;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.util.Utils;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
new file mode 100644
index 0000000000..4384daaa25
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -0,0 +1,711 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
+import static
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
+import static
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
+import static
org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
+import static
org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
+import static
org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.ActionResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.SMErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
+import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMFullThrowable;
+import org.apache.ignite.raft.jraft.rpc.impl.SMThrowable;
+import org.apache.ignite.raft.jraft.util.Endpoint;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The implementation of {@link RaftGroupService}.
+ */
+public class RaftGroupServiceImpl implements RaftGroupService {
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(RaftGroupServiceImpl.class);
+
+ private volatile long timeout;
+
+ /** Timeout for network calls. */
+ private final long rpcTimeout;
+
+ private final String groupId;
+
+ private final ReplicationGroupId realGroupId;
+
+ private final RaftMessagesFactory factory;
+
+ @Nullable
+ private volatile Peer leader;
+
+ private volatile List<Peer> peers;
+
+ private volatile List<Peer> learners;
+
+ private final ClusterService cluster;
+
+ private final long retryDelay;
+
+ /** Executor for scheduling retries of {@link
RaftGroupServiceImpl#sendWithRetry} invocations. */
+ private final ScheduledExecutorService executor;
+
+ /**
+ * Constructor.
+ *
+ * @param groupId Group id.
+ * @param cluster A cluster.
+ * @param factory A message factory.
+ * @param timeout Request timeout.
+ * @param peers Initial peers list.
+ * @param learners Initial learners list.
+ * @param leader Group leader.
+ * @param retryDelay Retry delay.
+ * @param executor Executor for retrying requests.
+ */
+ private RaftGroupServiceImpl(
+ ReplicationGroupId groupId,
+ ClusterService cluster,
+ RaftMessagesFactory factory,
+ int timeout,
+ int rpcTimeout,
+ List<Peer> peers,
+ List<Peer> learners,
+ @Nullable Peer leader,
+ long retryDelay,
+ ScheduledExecutorService executor
+ ) {
+ this.cluster = cluster;
+ this.peers = List.copyOf(peers);
+ this.learners = List.copyOf(learners);
+ this.factory = factory;
+ this.timeout = timeout;
+ this.rpcTimeout = rpcTimeout;
+ this.groupId = groupId.toString();
+ this.realGroupId = groupId;
+ this.retryDelay = retryDelay;
+ this.leader = leader;
+ this.executor = executor;
+ }
+
+ /**
+ * Starts raft group service.
+ *
+ * @param groupId Raft group id.
+ * @param cluster Cluster service.
+ * @param factory Message factory.
+ * @param timeout Timeout.
+ * @param rpcTimeout Network call timeout.
+ * @param peers Initial peers list.
+ * @param learners Initial learners list.
+ * @param getLeader {@code True} to get the group's leader upon service
creation.
+ * @param retryDelay Retry delay.
+ * @param executor Executor for retrying requests.
+ * @return Future representing pending completion of the operation.
+ */
+ public static CompletableFuture<RaftGroupService> start(
+ ReplicationGroupId groupId,
+ ClusterService cluster,
+ RaftMessagesFactory factory,
+ int timeout,
+ int rpcTimeout,
+ List<Peer> peers,
+ List<Peer> learners,
+ boolean getLeader,
+ long retryDelay,
+ ScheduledExecutorService executor
+ ) {
+ var service = new RaftGroupServiceImpl(groupId, cluster, factory,
timeout, rpcTimeout, peers, learners, null, retryDelay, executor);
+
+ if (!getLeader) {
+ return CompletableFuture.completedFuture(service);
+ }
+
+ return service.refreshLeader().handle((unused, throwable) -> {
+ if (throwable != null) {
+ if (throwable.getCause() instanceof TimeoutException) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to refresh a leader [groupId={}]",
groupId);
+ }
+ } else {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Failed to refresh a leader [groupId={}]",
throwable, groupId);
+ }
+ }
+ }
+
+ return service;
+ });
+ }
+
+ /**
+ * Starts raft group service.
+ *
+ * @param groupId Raft group id.
+ * @param cluster Cluster service.
+ * @param factory Message factory.
+ * @param timeout Timeout.
+ * @param peers Initial peers list.
+ * @param getLeader {@code True} to get the group's leader upon service
creation.
+ * @param retryDelay Retry delay.
+ * @param executor Executor for retrying requests.
+ * @return Future representing pending completion of the operation.
+ */
+ public static CompletableFuture<RaftGroupService> start(
+ ReplicationGroupId groupId,
+ ClusterService cluster,
+ RaftMessagesFactory factory,
+ int timeout,
+ List<Peer> peers,
+ boolean getLeader,
+ long retryDelay,
+ ScheduledExecutorService executor
+ ) {
+ return start(groupId, cluster, factory, timeout, timeout, peers,
List.of(), getLeader, retryDelay, executor);
+ }
+
+ @Override
+ public ReplicationGroupId groupId() {
+ return realGroupId;
+ }
+
+ @Override
+ public long timeout() {
+ return timeout;
+ }
+
+ @Override
+ public void timeout(long newTimeout) {
+ this.timeout = newTimeout;
+ }
+
+ @Override
+ public Peer leader() {
+ return leader;
+ }
+
+ @Override
+ public List<Peer> peers() {
+ return peers;
+ }
+
+ @Override
+ public List<Peer> learners() {
+ return learners;
+ }
+
+ @Override
+ public CompletableFuture<Void> refreshLeader() {
+ GetLeaderRequest req =
factory.getLeaderRequest().groupId(groupId).build();
+
+ return this.<GetLeaderResponse>sendWithRetry(randomNode(), req)
+ .thenAccept(resp -> this.leader = parsePeer(resp.leaderId()));
+ }
+
+ @Override
+ public CompletableFuture<IgniteBiTuple<Peer, Long>>
refreshAndGetLeaderWithTerm() {
+ GetLeaderRequest req =
factory.getLeaderRequest().groupId(groupId).build();
+
+ return this.<GetLeaderResponse>sendWithRetry(randomNode(), req)
+ .thenApply(resp -> {
+ Peer respLeader = parsePeer(resp.leaderId());
+
+ this.leader = respLeader;
+
+ return new IgniteBiTuple<>(respLeader, resp.currentTerm());
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res ->
refreshMembers(onlyAlive));
+ }
+
+ GetPeersRequest req =
factory.getPeersRequest().onlyAlive(onlyAlive).groupId(groupId).build();
+
+ return this.<GetPeersResponse>sendWithRetry(leader, req)
+ .thenAccept(resp -> {
+ this.peers = parsePeerList(resp.peersList());
+ this.learners = parsePeerList(resp.learnersList());
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> addPeer(Peer peer) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res -> addPeer(peer));
+ }
+
+ AddPeerRequest req =
factory.addPeerRequest().groupId(groupId).peerId(peerId(peer)).build();
+
+ return this.<AddPeerResponse>sendWithRetry(leader, req)
+ .thenAccept(resp -> this.peers =
parsePeerList(resp.newPeersList()));
+ }
+
+ @Override
+ public CompletableFuture<Void> removePeer(Peer peer) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res -> removePeer(peer));
+ }
+
+ RemovePeerRequest req =
factory.removePeerRequest().groupId(groupId).peerId(peerId(peer)).build();
+
+ return this.<RemovePeerResponse>sendWithRetry(leader, req)
+ .thenAccept(resp -> this.peers =
parsePeerList(resp.newPeersList()));
+ }
+
+ @Override
+ public CompletableFuture<Void> changePeers(List<Peer> peers) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res -> changePeers(peers));
+ }
+
+ ChangePeersRequest req =
factory.changePeersRequest().groupId(groupId).newPeersList(peerIds(peers)).build();
+
+ return this.<ChangePeersResponse>sendWithRetry(leader, req)
+ .thenAccept(resp -> this.peers =
parsePeerList(resp.newPeersList()));
+ }
+
+ @Override
+ public CompletableFuture<Void> changePeersAsync(List<Peer> peers, long
term) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res -> changePeersAsync(peers,
term));
+ }
+
+ ChangePeersAsyncRequest req = factory.changePeersAsyncRequest()
+ .groupId(groupId)
+ .term(term)
+ .newPeersList(peerIds(peers))
+ .build();
+
+ LOG.info("Sending changePeersAsync request for group={} to peers={}
with leader term={}",
+ groupId, peers, term);
+
+ return this.<ChangePeersAsyncResponse>sendWithRetry(leader, req)
+ .thenAccept(resp -> {
+ // We expect that all raft related errors will be handled
by sendWithRetry, means that
+ // such responses will initiate a retrying of the original
request.
+ assert !(resp instanceof RpcRequests.ErrorResponse);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> addLearners(List<Peer> learners) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res -> addLearners(learners));
+ }
+
+ AddLearnersRequest req =
factory.addLearnersRequest().groupId(groupId).learnersList(peerIds(learners)).build();
+
+ return this.<LearnersOpResponse>sendWithRetry(leader, req)
+ .thenAccept(resp -> this.learners =
parsePeerList(resp.newLearnersList()));
+ }
+
+ @Override
+ public CompletableFuture<Void> removeLearners(List<Peer> learners) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res ->
removeLearners(learners));
+ }
+
+ RemoveLearnersRequest req =
factory.removeLearnersRequest().groupId(groupId).learnersList(peerIds(learners)).build();
+
+ return this.<LearnersOpResponse>sendWithRetry(leader, req)
+ .thenAccept(resp -> this.learners =
parsePeerList(resp.newLearnersList()));
+ }
+
+ @Override
+ public CompletableFuture<Void> resetLearners(List<Peer> learners) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res -> resetLearners(learners));
+ }
+
+ ResetLearnersRequest req =
factory.resetLearnersRequest().groupId(groupId).learnersList(peerIds(learners)).build();
+
+ return this.<LearnersOpResponse>sendWithRetry(leader, req)
+ .thenAccept(resp -> this.learners =
parsePeerList(resp.newLearnersList()));
+ }
+
+ @Override
+ public CompletableFuture<Void> snapshot(Peer peer) {
+ SnapshotRequest req =
factory.snapshotRequest().groupId(groupId).build();
+
+ // Disable the timeout for a snapshot request.
+ return cluster.messagingService().invoke(peer.address(), req,
Integer.MAX_VALUE)
+ .thenAccept(resp -> {
+ if (resp != null) {
+ RpcRequests.ErrorResponse resp0 =
(RpcRequests.ErrorResponse) resp;
+
+ if (resp0.errorCode() !=
RaftError.SUCCESS.getNumber()) {
+ var ex = new
RaftException(RaftError.forNumber(resp0.errorCode()), resp0.errorMsg());
+
+ throw new CompletionException(ex);
+ }
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> transferLeadership(Peer newLeader) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res ->
transferLeadership(newLeader));
+ }
+
+ TransferLeaderRequest req = factory.transferLeaderRequest()
+ .groupId(groupId)
+ .leaderId(peerId(leader))
+ .peerId(peerId(newLeader))
+ .build();
+
+ return sendWithRetry(leader, req)
+ .thenRun(() -> this.leader = newLeader);
+ }
+
+ @Override
+ public <R> CompletableFuture<R> run(Command cmd) {
+ Peer leader = this.leader;
+
+ if (leader == null) {
+ return refreshLeader().thenCompose(res -> run(cmd));
+ }
+
+ ActionRequest req =
factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
+
+ return this.<ActionResponse>sendWithRetry(leader, req)
+ .thenApply(resp -> (R) resp.result());
+ }
+
+ @Override
+ public void shutdown() {
+ // No-op.
+ }
+
+ @Override
+ public ClusterService clusterService() {
+ return cluster;
+ }
+
+ private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry(Peer
peer, NetworkMessage req) {
+ var future = new CompletableFuture<R>();
+
+ sendWithRetry(peer, req, currentTimeMillis() + timeout, future);
+
+ return future;
+ }
+
+ /**
+ * Retries a request until success or timeout.
+ *
+ * @param peer Target peer.
+ * @param req The request.
+ * @param stopTime Stop time.
+ * @param fut The future.
+ * @param <R> Response type.
+ */
+ private <R extends NetworkMessage> void sendWithRetry(Peer peer,
NetworkMessage req, long stopTime, CompletableFuture<R> fut) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("sendWithRetry peers={} req={} from={} to={}",
+ peers,
+ S.toString(req),
+ cluster.topologyService().localMember().address(),
+ peer.address());
+ }
+
+ if (currentTimeMillis() >= stopTime) {
+ fut.completeExceptionally(new TimeoutException());
+
+ return;
+ }
+
+ //TODO: IGNITE-15389
org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock
inside
+ cluster.messagingService().invoke(peer.address(), req, rpcTimeout)
+ .whenCompleteAsync((resp, err) -> {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("sendWithRetry resp={} from={} to={} err={}",
+ S.toString(resp),
+
cluster.topologyService().localMember().address(),
+ peer.address(),
+ err == null ? null : err.getMessage());
+ }
+
+ if (err != null) {
+ handleThrowable(err, peer, req, stopTime, fut);
+ } else if (resp instanceof ErrorResponse) {
+ handleErrorResponse((ErrorResponse) resp, peer, req,
stopTime, fut);
+ } else if (resp instanceof SMErrorResponse) {
+ handleSmErrorResponse((SMErrorResponse) resp, fut);
+ } else {
+ leader = peer; // The OK response was received from a
leader.
+
+ fut.complete((R) resp);
+ }
+ });
+ }
+
+ private void handleThrowable(
+ Throwable err, Peer peer, NetworkMessage req, long stopTime,
CompletableFuture<? extends NetworkMessage> fut
+ ) {
+ if (recoverable(err)) {
+ LOG.warn(
+ "Recoverable error during the request type={} occurred
(will be retried on the randomly selected node): ",
+ err, req.getClass().getSimpleName()
+ );
+
+ scheduleRetry(() -> sendWithRetry(randomNode(peer), req, stopTime,
fut));
+ } else {
+ fut.completeExceptionally(err);
+ }
+ }
+
+ private void handleErrorResponse(
+ ErrorResponse resp, Peer peer, NetworkMessage req, long stopTime,
CompletableFuture<? extends NetworkMessage> fut
+ ) {
+ RaftError error = RaftError.forNumber(resp.errorCode());
+
+ switch (error) {
+ case SUCCESS:
+ leader = peer; // The OK response was received from a leader.
+
+ fut.complete(null); // Void response.
+
+ break;
+
+ case EBUSY:
+ case EAGAIN:
+ scheduleRetry(() -> sendWithRetry(peer, req, stopTime, fut));
+
+ break;
+
+ case ENOENT:
+ scheduleRetry(() -> {
+ // If changing peers or requesting a leader and something
is not found
+ // probably target peer is doing rebalancing, try another
peer.
+ if (req instanceof GetLeaderRequest || req instanceof
ChangePeersAsyncRequest) {
+ sendWithRetry(randomNode(peer), req, stopTime, fut);
+ } else {
+ sendWithRetry(peer, req, stopTime, fut);
+ }
+ });
+
+ break;
+
+ case EPERM:
+ // TODO: IGNITE-15706
+ case UNKNOWN:
+ case EINTERNAL:
+ if (resp.leaderId() == null) {
+ scheduleRetry(() -> sendWithRetry(randomNode(peer), req,
stopTime, fut));
+ } else {
+ leader = parsePeer(resp.leaderId()); // Update a leader.
+
+ scheduleRetry(() -> sendWithRetry(leader, req, stopTime,
fut));
+ }
+
+ break;
+
+ default:
+ fut.completeExceptionally(new RaftException(error,
resp.errorMsg()));
+
+ break;
+ }
+ }
+
+ private static void handleSmErrorResponse(SMErrorResponse resp,
CompletableFuture<? extends NetworkMessage> fut) {
+ SMThrowable th = resp.error();
+
+ if (th instanceof SMCompactedThrowable) {
+ SMCompactedThrowable compactedThrowable = (SMCompactedThrowable)
th;
+
+ try {
+ Throwable restoredTh = (Throwable)
Class.forName(compactedThrowable.throwableClassName())
+ .getConstructor(String.class)
+ .newInstance(compactedThrowable.throwableMessage());
+
+ fut.completeExceptionally(restoredTh);
+ } catch (Exception e) {
+ LOG.warn("Cannot restore throwable from user's state machine. "
+ + "Check if throwable " +
compactedThrowable.throwableClassName()
+ + " is present in the classpath.");
+
+ fut.completeExceptionally(new
IgniteException(compactedThrowable.throwableMessage()));
+ }
+ } else if (th instanceof SMFullThrowable) {
+ fut.completeExceptionally(((SMFullThrowable) th).throwable());
+ }
+ }
+
+ private void scheduleRetry(Runnable runnable) {
+ executor.schedule(runnable, retryDelay, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Checks if an error is recoverable, for example, {@link
java.net.ConnectException}.
+ *
+ * @param t The throwable.
+ * @return {@code True} if this is a recoverable exception.
+ */
+ private static boolean recoverable(Throwable t) {
+ if (t instanceof ExecutionException || t instanceof
CompletionException) {
+ t = t.getCause();
+ }
+
+ return t instanceof TimeoutException || t instanceof IOException;
+ }
+
+ private Peer randomNode() {
+ return randomNode(null);
+ }
+
+ /**
+ * Returns a random peer. Tries 5 times finding a peer different from the
excluded peer. If excluded peer is null, just returns a random
+ * peer.
+ *
+ * @param excludedPeer Excluded peer.
+ * @return Random peer.
+ */
+ private Peer randomNode(@Nullable Peer excludedPeer) {
+ List<Peer> peers0 = peers;
+
+ assert peers0 != null && !peers0.isEmpty();
+
+ int lastPeerIndex = excludedPeer == null ? -1 :
peers0.indexOf(excludedPeer);
+
+ ThreadLocalRandom random = current();
+
+ int newIdx = 0;
+
+ for (int retries = 0; retries < 5; retries++) {
+ newIdx = random.nextInt(peers0.size());
+
+ if (newIdx != lastPeerIndex) {
+ break;
+ }
+ }
+
+ return peers0.get(newIdx);
+ }
+
+ /**
+ * Parse {@link Peer} from string representation of {@link PeerId}.
+ *
+ * @param peerId String representation of {@link PeerId}
+ * @return Peer
+ */
+ // TODO: Remove after IGNITE-15506
+ private static @Nullable Peer parsePeer(@Nullable String peerId) {
+ PeerId id = PeerId.parsePeer(peerId);
+
+ if (id == null) {
+ return null;
+ } else {
+ Endpoint endpoint = id.getEndpoint();
+
+ return new Peer(new NetworkAddress(endpoint.getIp(),
endpoint.getPort()));
+ }
+ }
+
+ /**
+ * Parse list of {@link PeerId} from list with string representations.
+ *
+ * @param peers List of {@link PeerId} string representations.
+ * @return List of {@link PeerId}
+ */
+ private static @Nullable List<Peer> parsePeerList(@Nullable
Collection<String> peers) {
+ if (peers == null) {
+ return null;
+ }
+
+ List<Peer> res = new ArrayList<>(peers.size());
+
+ for (String peer : peers) {
+ res.add(parsePeer(peer));
+ }
+
+ return res;
+ }
+
+ private static String peerId(Peer peer) {
+ return PeerId.fromPeer(peer).toString();
+ }
+
+ private static List<String> peerIds(Collection<Peer> peers) {
+ return
peers.stream().map(RaftGroupServiceImpl::peerId).collect(toList());
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
deleted file mode 100644
index 34cf4ebd28..0000000000
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ /dev/null
@@ -1,769 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.jraft.rpc.impl;
-
-import static java.lang.System.currentTimeMillis;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.ThreadLocalRandom.current;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
-import static
org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
-import static
org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
-import static
org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.raft.client.Command;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.error.RaftError;
-import org.apache.ignite.raft.jraft.rpc.ActionRequest;
-import org.apache.ignite.raft.jraft.rpc.ActionResponse;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAsyncResponse;
-import org.apache.ignite.raft.jraft.rpc.RpcRequests;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * The implementation of {@link RaftGroupService}
- */
-public class RaftGroupServiceImpl implements RaftGroupService {
- /** The logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(RaftGroupServiceImpl.class);
-
- /** */
- private volatile long timeout;
-
- /** Timeout for network calls. */
- private final long rpcTimeout;
-
- /** */
- private final String groupId;
-
- private final ReplicationGroupId realGroupId;
-
- /** */
- private final RaftMessagesFactory factory;
-
- /** */
- private volatile Peer leader;
-
- /** */
- private volatile List<Peer> peers;
-
- /** */
- private volatile List<Peer> learners;
-
- /** */
- private final ClusterService cluster;
-
- /** */
- private final long retryDelay;
-
- /** Executor for scheduling retries of {@link
RaftGroupServiceImpl#sendWithRetry} invocations. */
- private final ScheduledExecutorService executor;
-
- /**
- * Constructor.
- *
- * @param groupId Group id.
- * @param cluster A cluster.
- * @param factory A message factory.
- * @param timeout Request timeout.
- * @param peers Initial group configuration.
- * @param leader Group leader.
- * @param retryDelay Retry delay.
- * @param executor Executor for retrying requests.
- */
- private RaftGroupServiceImpl(
- ReplicationGroupId groupId,
- ClusterService cluster,
- RaftMessagesFactory factory,
- int timeout,
- int rpcTimeout,
- List<Peer> peers,
- List<Peer> learners,
- Peer leader,
- long retryDelay,
- ScheduledExecutorService executor
- ) {
- this.cluster = requireNonNull(cluster);
- this.peers = List.copyOf(peers);
- this.learners = List.copyOf(learners);
- this.factory = factory;
- this.timeout = timeout;
- this.rpcTimeout = rpcTimeout;
- this.groupId = groupId.toString();
- this.realGroupId = groupId;
- this.retryDelay = retryDelay;
- this.leader = leader;
- this.executor = executor;
- }
-
- /**
- * Starts raft group service.
- *
- * @param groupId Raft group id.
- * @param cluster Cluster service.
- * @param factory Message factory.
- * @param timeout Timeout.
- * @param rpcTimeout Network call timeout.
- * @param peers List of all peers.
- * @param getLeader {@code True} to get the group's leader upon service
creation.
- * @param retryDelay Retry delay.
- * @param executor Executor for retrying requests.
- * @return Future representing pending completion of the operation.
- */
- public static CompletableFuture<RaftGroupService> start(
- ReplicationGroupId groupId,
- ClusterService cluster,
- RaftMessagesFactory factory,
- int timeout,
- int rpcTimeout,
- List<Peer> peers,
- List<Peer> learners,
- boolean getLeader,
- long retryDelay,
- ScheduledExecutorService executor
- ) {
- var service = new RaftGroupServiceImpl(groupId, cluster, factory,
timeout, rpcTimeout, peers, learners, null, retryDelay, executor);
-
- if (!getLeader)
- return CompletableFuture.completedFuture(service);
-
- return service.refreshLeader().handle((unused, throwable) -> {
- if (throwable != null) {
- if (throwable.getCause() instanceof TimeoutException) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to refresh a leader [groupId={}]",
groupId);
- }
- } else {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Failed to refresh a leader [groupId={}]",
throwable, groupId);
- }
- }
- }
- return service;
- });
- }
-
- /**
- * Starts raft group service.
- *
- * @param groupId Raft group id.
- * @param cluster Cluster service.
- * @param factory Message factory.
- * @param timeout Timeout.
- * @param peers List of all peers.
- * @param getLeader {@code True} to get the group's leader upon service
creation.
- * @param retryDelay Retry delay.
- * @param executor Executor for retrying requests.
- * @return Future representing pending completion of the operation.
- */
- public static CompletableFuture<RaftGroupService> start(
- ReplicationGroupId groupId,
- ClusterService cluster,
- RaftMessagesFactory factory,
- int timeout,
- List<Peer> peers,
- boolean getLeader,
- long retryDelay,
- ScheduledExecutorService executor
- ) {
- return start(groupId, cluster, factory, timeout, timeout, peers,
List.of(), getLeader, retryDelay, executor);
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull ReplicationGroupId groupId() {
- return realGroupId;
- }
-
- /** {@inheritDoc} */
- @Override public long timeout() {
- return timeout;
- }
-
- /** {@inheritDoc} */
- @Override public void timeout(long newTimeout) {
- this.timeout = newTimeout;
- }
-
- /** {@inheritDoc} */
- @Override public Peer leader() {
- return leader;
- }
-
- /** {@inheritDoc} */
- @Override public List<Peer> peers() {
- return peers;
- }
-
- /** {@inheritDoc} */
- @Override public List<Peer> learners() {
- return learners;
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> refreshLeader() {
- GetLeaderRequest req =
factory.getLeaderRequest().groupId(groupId).build();
-
- CompletableFuture<GetLeaderResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(randomNode(), req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- leader = parsePeer(resp.leaderId());
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<IgniteBiTuple<Peer, Long>>
refreshAndGetLeaderWithTerm() {
- GetLeaderRequest req =
factory.getLeaderRequest().groupId(groupId).build();
-
- CompletableFuture<GetLeaderResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(randomNode(), req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- Peer respLeader = parsePeer(resp.leaderId());
-
- leader = respLeader;
-
- return new IgniteBiTuple<>(respLeader, resp.currentTerm());
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> refreshMembers(boolean onlyAlive)
{
- GetPeersRequest req =
factory.getPeersRequest().onlyAlive(onlyAlive).groupId(groupId).build();
-
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res ->
refreshMembers(onlyAlive));
-
- CompletableFuture<GetPeersResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- peers = parsePeerList(resp.peersList());
- learners = parsePeerList(resp.learnersList());
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> addPeer(Peer peer) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> addPeer(peer));
-
- AddPeerRequest req =
factory.addPeerRequest().groupId(groupId).peerId(PeerId.fromPeer(peer).toString()).build();
-
- CompletableFuture<AddPeerResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.peers = parsePeerList(resp.newPeersList());
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> removePeer(Peer peer) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> removePeer(peer));
-
- RemovePeerRequest req =
factory.removePeerRequest().groupId(groupId).peerId(PeerId.fromPeer(peer).toString()).build();
-
- CompletableFuture<RemovePeerResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.peers = parsePeerList(resp.newPeersList());
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> changePeers(List<Peer> peers) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> changePeers(peers));
-
- List<String> peersToChange = peers.stream().map(p ->
PeerId.fromPeer(p).toString())
- .collect(Collectors.toList());
-
- ChangePeersRequest req = factory.changePeersRequest().groupId(groupId)
- .newPeersList(peersToChange).build();
-
- CompletableFuture<ChangePeersResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.peers = parsePeerList(resp.newPeersList());
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> changePeersAsync(List<Peer>
peers, long term) {
- Peer leader = this.leader;
-
- if (leader == null) {
- return refreshLeader().thenCompose(res -> changePeersAsync(peers,
term));
- }
-
- List<String> peersToChange = peers.stream().map(p ->
PeerId.fromPeer(p).toString())
- .collect(Collectors.toList());
-
- ChangePeersAsyncRequest req =
factory.changePeersAsyncRequest().groupId(groupId)
- .term(term)
- .newPeersList(peersToChange).build();
-
- CompletableFuture<ChangePeersAsyncResponse> fut = new
CompletableFuture<>();
-
- LOG.info("Sending changePeersAsync request for group={} to peers={}
with leader term={}",
- groupId, peers, term);
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.handle((resp, err) -> {
- // We expect that all raft related errors will be handled by
sendWithRetry, means that
- // such responses will initiate a retrying of the original request.
- assert !(resp instanceof RpcRequests.ErrorResponse);
-
- if (err != null) {
- return CompletableFuture.<Void>failedFuture(err);
- }
-
- return CompletableFuture.<Void>completedFuture(null);
- }).thenCompose(Function.identity());
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> addLearners(List<Peer> learners) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> addLearners(learners));
-
- List<String> lrns = learners.stream().map(p ->
PeerId.fromPeer(p).toString()).collect(Collectors.toList());
- AddLearnersRequest req =
factory.addLearnersRequest().groupId(groupId).learnersList(lrns).build();
-
- CompletableFuture<LearnersOpResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.learners = parsePeerList(resp.newLearnersList());
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> removeLearners(List<Peer>
learners) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res ->
removeLearners(learners));
-
- List<String> lrns = learners.stream().map(p ->
PeerId.fromPeer(p).toString()).collect(Collectors.toList());
- RemoveLearnersRequest req =
factory.removeLearnersRequest().groupId(groupId).learnersList(lrns).build();
-
- CompletableFuture<LearnersOpResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.learners = parsePeerList(resp.newLearnersList());
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> resetLearners(List<Peer>
learners) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> resetLearners(learners));
-
- List<String> lrns = learners.stream().map(p ->
PeerId.fromPeer(p).toString()).collect(Collectors.toList());
- ResetLearnersRequest req =
factory.resetLearnersRequest().groupId(groupId).learnersList(lrns).build();
-
- CompletableFuture<LearnersOpResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> {
- this.learners = parsePeerList(resp.newLearnersList());
-
- return null;
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> snapshot(Peer peer) {
- SnapshotRequest req =
factory.snapshotRequest().groupId(groupId).build();
-
- // Disable the timeout for a snapshot request.
- CompletableFuture<NetworkMessage> fut =
cluster.messagingService().invoke(peer.address(), req, Integer.MAX_VALUE);
-
- return fut.thenCompose(resp -> {
- if (resp != null) {
- RpcRequests.ErrorResponse resp0 = (RpcRequests.ErrorResponse)
resp;
-
- if (resp0.errorCode() != RaftError.SUCCESS.getNumber())
- return CompletableFuture.failedFuture(new
RaftException(RaftError.forNumber(resp0.errorCode()), resp0.errorMsg()));
- }
-
- return CompletableFuture.completedFuture(null);
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> transferLeadership(Peer
newLeader) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res ->
transferLeadership(newLeader));
-
- TransferLeaderRequest req = factory.transferLeaderRequest()
- .groupId(groupId)
- .leaderId(PeerId.fromPeer(leader).toString())
- .peerId(PeerId.fromPeer(newLeader).toString())
- .build();
-
- CompletableFuture<NetworkMessage> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenRun(() -> {
- this.leader = newLeader;
- });
- }
-
- /** {@inheritDoc} */
- @Override public <R> CompletableFuture<R> run(Command cmd) {
- Peer leader = this.leader;
-
- if (leader == null)
- return refreshLeader().thenCompose(res -> run(cmd));
-
- ActionRequest req =
factory.actionRequest().command(cmd).groupId(groupId).readOnlySafe(true).build();
-
- CompletableFuture<ActionResponse> fut = new CompletableFuture<>();
-
- sendWithRetry(leader, req, currentTimeMillis() + timeout, fut);
-
- return fut.thenApply(resp -> (R) resp.result());
- }
-
- /** {@inheritDoc} */
- @Override public void shutdown() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public ClusterService clusterService() {
- return cluster;
- }
-
- /**
- * Retries a request until success or timeout.
- *
- * @param peer Target peer.
- * @param req The request.
- * @param stopTime Stop time.
- * @param fut The future.
- * @param <R> Response type.
- */
- private <R> void sendWithRetry(Peer peer, Object req, long stopTime,
CompletableFuture<R> fut) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("sendWithRetry peers={} req={} from={} to={}",
- peers,
- S.toString(req),
- cluster.topologyService().localMember().address(),
- peer.address());
- }
-
- if (currentTimeMillis() >= stopTime) {
- fut.completeExceptionally(new TimeoutException());
-
- return;
- }
-
- CompletableFuture<?> fut0 =
cluster.messagingService().invoke(peer.address(), (NetworkMessage) req,
rpcTimeout);
-
- //TODO: IGNITE-15389
org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock
inside
- fut0.whenCompleteAsync(new BiConsumer<Object, Throwable>() {
- @Override public void accept(Object resp, Throwable err) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("sendWithRetry resp={} from={} to={} err={}",
- S.toString(resp),
- cluster.topologyService().localMember().address(),
- peer.address(),
- err == null ? null : err.getMessage());
- }
-
- if (err != null) {
- if (recoverable(err)) {
- executor.schedule(() -> {
- LOG.warn("Recoverable error during the request
type={} occurred (will be retried on the randomly selected node): ",
- err, req.getClass().getSimpleName());
-
- sendWithRetry(randomNode(peer), req, stopTime,
fut);
-
- return null;
- }, retryDelay, TimeUnit.MILLISECONDS);
- }
- else {
- fut.completeExceptionally(err);
- }
- }
- else if (resp instanceof RpcRequests.ErrorResponse) {
- RpcRequests.ErrorResponse resp0 =
(RpcRequests.ErrorResponse) resp;
-
- if (resp0.errorCode() == RaftError.SUCCESS.getNumber()) {
// Handle OK response.
- leader = peer; // The OK response was received from a
leader.
-
- fut.complete(null); // Void response.
- }
- else if (resp0.errorCode() == RaftError.EBUSY.getNumber()
||
- resp0.errorCode() == (RaftError.EAGAIN.getNumber()) ||
- resp0.errorCode() == (RaftError.ENOENT.getNumber())) {
// Possibly a node has not been started.
- executor.schedule(() -> {
- Peer targetPeer = peer;
-
- if (resp0.errorCode() ==
RaftError.ENOENT.getNumber()) {
- // If changing peers or requesting a leader
and something is not found
- // probably target peer is doing rebalancing,
try another peer.
- if (req instanceof GetLeaderRequest || req
instanceof ChangePeersAsyncRequest) {
- targetPeer = randomNode(peer);
- }
- }
-
- sendWithRetry(targetPeer, req, stopTime, fut);
-
- return null;
- }, retryDelay, TimeUnit.MILLISECONDS);
- }
- else if (resp0.errorCode() == RaftError.EPERM.getNumber()
||
- // TODO: IGNITE-15706
- resp0.errorCode() == RaftError.UNKNOWN.getNumber() ||
- resp0.errorCode() == RaftError.EINTERNAL.getNumber()) {
- if (resp0.leaderId() == null) {
- executor.schedule(() -> {
- sendWithRetry(randomNode(peer), req, stopTime,
fut);
-
- return null;
- }, retryDelay, TimeUnit.MILLISECONDS);
- }
- else {
- leader = parsePeer(resp0.leaderId()); // Update a
leader.
-
- executor.schedule(() -> {
- sendWithRetry(leader, req, stopTime, fut);
-
- return null;
- }, retryDelay, TimeUnit.MILLISECONDS);
- }
- }
- else {
- fut.completeExceptionally(
- new
RaftException(RaftError.forNumber(resp0.errorCode()), resp0.errorMsg()));
- }
- }
- else if (resp instanceof RpcRequests.SMErrorResponse) {
- SMThrowable th =
((RpcRequests.SMErrorResponse)resp).error();
- if (th instanceof SMCompactedThrowable) {
- SMCompactedThrowable compactedThrowable =
(SMCompactedThrowable)th;
-
- try {
- Throwable restoredTh =
(Throwable)Class.forName(compactedThrowable.throwableClassName())
- .getConstructor(String.class)
-
.newInstance(compactedThrowable.throwableMessage());
-
- fut.completeExceptionally(restoredTh);
- }
- catch (Exception e) {
- LOG.warn("Cannot restore throwable from user's
state machine. " +
- "Check if throwable " +
compactedThrowable.throwableClassName() +
- " is presented in the classpath.");
-
- fut.completeExceptionally(new
IgniteException(compactedThrowable.throwableMessage()));
- }
- }
- else if (th instanceof SMFullThrowable)
-
fut.completeExceptionally(((SMFullThrowable)th).throwable());
- }
- else {
- leader = peer; // The OK response was received from a
leader.
-
- fut.complete((R) resp);
- }
- }
- });
- }
-
- /**
- * Checks if an error is recoverable, for example, {@link
java.net.ConnectException}.
- * @param t The throwable.
- * @return {@code True} if this is a recoverable exception.
- */
- private static boolean recoverable(Throwable t) {
- if (t instanceof ExecutionException || t instanceof
CompletionException) {
- t = t.getCause();
- }
-
- return t instanceof TimeoutException || t instanceof IOException;
- }
-
- private Peer randomNode() {
- return randomNode(null);
- }
-
- /**
- * Returns a random peer. Tries 5 times finding a peer different from the
excluded peer.
- * If excluded peer is null, just returns a random peer.
- *
- * @param excludedPeer Excluded peer.
- * @return Random peer.
- */
- private Peer randomNode(Peer excludedPeer) {
- List<Peer> peers0 = peers;
-
- assert peers0 != null && !peers0.isEmpty();
-
- int lastPeerIndex = -1;
-
- if (excludedPeer != null) {
- lastPeerIndex = peers0.indexOf(excludedPeer);
- }
-
- int retries = 0;
-
- ThreadLocalRandom random = current();
-
- int newIdx = 0;
-
- while (retries < 5) {
- newIdx = random.nextInt(peers0.size());
-
- if (newIdx != lastPeerIndex) {
- break;
- }
-
- retries++;
- }
-
- return peers0.get(newIdx);
- }
-
- /**
- * Parse {@link Peer} from string representation of {@link PeerId}.
- *
- * @param peerId String representation of {@link PeerId}
- * @return Peer
- */
- // TODO: Remove after IGNITE-15506
- private static Peer parsePeer(String peerId) {
- return peerFromPeerId(PeerId.parsePeer(peerId));
- }
-
- /**
- * Creates new {@link Peer} from {@link PeerId}.
- *
- * @param peer PeerId
- * @return {@link Peer}
- */
- private static Peer peerFromPeerId(PeerId peer) {
- if (peer == null)
- return null;
- else
- return new Peer(NetworkAddress.from(peer.getEndpoint().getIp() +
":" + peer.getEndpoint().getPort()));
- }
-
- /**
- * Parse list of {@link PeerId} from list with string representations.
- *
- * @param peers List of {@link PeerId} string representations.
- * @return List of {@link PeerId}
- */
- private List<Peer> parsePeerList(Collection<String> peers) {
- if (peers == null)
- return null;
-
- List<Peer> res = new ArrayList<>(peers.size());
-
- for (String peer: peers)
- res.add(parsePeer(peer));
-
- return res;
- }
-}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
new file mode 100644
index 0000000000..469373c68a
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.raft.jraft.test.TestUtils.peersToIds;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+import java.net.ConnectException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.CliRequests;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Test methods of raft group service.
+ */
+@ExtendWith(MockitoExtension.class)
+public class RaftGroupServiceTest extends BaseIgniteAbstractTest {
+ private static final List<Peer> NODES = Stream.of(20000, 20001, 20002)
+ .map(port -> new NetworkAddress("localhost", port))
+ .map(Peer::new)
+ .collect(toUnmodifiableList());
+
+ private static final RaftMessagesFactory FACTORY = new
RaftMessagesFactory();
+
+ private volatile @Nullable Peer leader = NODES.get(0);
+
+ /** Call timeout. */
+ private static final int TIMEOUT = 1000;
+
+ /** Retry delay. */
+ private static final int DELAY = 200;
+
+ /** Current term. */
+ private static final long CURRENT_TERM = 1;
+
+ /** Test group id. */
+ private static final TestReplicationGroupId TEST_GRP = new
TestReplicationGroupId("test");
+
+ /** Mock cluster. */
+ @Mock
+ private ClusterService cluster;
+
+ /** Mock messaging service. */
+ @Mock
+ private MessagingService messagingService;
+
+ /** Executor for raft group services. */
+ private ScheduledExecutorService executor;
+
+ @BeforeEach
+ void before() {
+ when(cluster.messagingService()).thenReturn(messagingService);
+
+ executor = new ScheduledThreadPoolExecutor(20, new
NamedThreadFactory(Loza.CLIENT_POOL_NAME, logger()));
+ }
+
+ /**
+ * Shutdown executor for raft group services.
+ */
+ @AfterEach
+ void after() {
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testRefreshLeaderStable() {
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ assertNull(service.leader());
+
+ assertThat(service.refreshLeader(), willCompleteSuccessfully());
+
+ assertEquals(leader, service.leader());
+ }
+
+ @Test
+ public void testRefreshLeaderNotElected() {
+ mockLeaderRequest(false);
+
+ // Simulate running elections.
+ leader = null;
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ assertNull(service.leader());
+
+ assertThat(service.refreshLeader(), willThrow(TimeoutException.class));
+ }
+
+ @Test
+ public void testRefreshLeaderElectedAfterDelay() {
+ mockLeaderRequest(false);
+
+ // Simulate running elections.
+ leader = null;
+
+ executor.schedule((Runnable) () -> leader = NODES.get(0), 500,
TimeUnit.MILLISECONDS);
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ assertNull(service.leader());
+
+ assertThat(service.refreshLeader(), willCompleteSuccessfully());
+
+ assertEquals(NODES.get(0), service.leader());
+ }
+
+ @Test
+ public void testRefreshLeaderWithTimeout() {
+ mockLeaderRequest(true);
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ assertThat(service.refreshLeader(), willThrow(TimeoutException.class,
500, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testUserRequestLeaderElected() {
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ assertThat(service.refreshLeader(), willCompleteSuccessfully());
+
+ assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+ }
+
+ @Test
+ public void testUserRequestLazyInitLeader() {
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ assertNull(service.leader());
+
+ assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+
+ assertEquals(leader, service.leader());
+ }
+
+ @Test
+ public void testUserRequestWithTimeout() {
+ mockLeaderRequest(false);
+ mockUserInput(true, null);
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ assertThat(service.run(new TestCommand()),
willThrow(TimeoutException.class, 500, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testUserRequestLeaderNotElected() {
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service = startRaftGroupService(NODES, true);
+
+ Peer leader = this.leader;
+
+ assertEquals(leader, service.leader());
+
+ this.leader = null;
+
+ assertEquals(leader, service.leader());
+
+ assertThat(service.run(new TestCommand()),
willThrow(TimeoutException.class));
+ }
+
+ @Test
+ public void testUserRequestLeaderElectedAfterDelay() {
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service = startRaftGroupService(NODES, true);
+
+ Peer leader = this.leader;
+
+ assertEquals(leader, service.leader());
+
+ this.leader = null;
+
+ assertEquals(leader, service.leader());
+
+ executor.schedule((Runnable) () -> this.leader = NODES.get(0), 500,
TimeUnit.MILLISECONDS);
+
+ assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+
+ assertEquals(NODES.get(0), service.leader());
+ }
+
+ @Test
+ public void testUserRequestLeaderElectedAfterDelayWithFailedNode() {
+ mockLeaderRequest(false);
+ mockUserInput(false, NODES.get(0));
+
+ RaftGroupService service = startRaftGroupService(NODES, true, TIMEOUT
* 3);
+
+ Peer leader = this.leader;
+
+ assertEquals(leader, service.leader());
+
+ this.leader = null;
+
+ assertEquals(leader, service.leader());
+
+ executor.schedule(
+ () -> {
+ logger().info("Set leader {}", NODES.get(1));
+
+ this.leader = NODES.get(1);
+ },
+ 500, TimeUnit.MILLISECONDS
+ );
+
+ assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+
+ assertEquals(NODES.get(1), service.leader());
+ }
+
+ @Test
+ public void testUserRequestLeaderChanged() {
+ mockLeaderRequest(false);
+ mockUserInput(false, null);
+
+ RaftGroupService service = startRaftGroupService(NODES, true);
+
+ Peer leader = this.leader;
+
+ assertEquals(leader, service.leader());
+
+ Peer newLeader = NODES.get(1);
+
+ this.leader = newLeader;
+
+ assertEquals(leader, service.leader());
+ assertNotEquals(leader, newLeader);
+
+ // Runs the command on an old leader. It should respond with leader
changed error, when transparently retry.
+ assertThat(service.run(new TestCommand()),
willBe(instanceOf(TestResponse.class)));
+
+ assertEquals(newLeader, service.leader());
+ }
+
+ @Test
+ public void testSnapshotExecutionException() {
+ mockSnapshotRequest(1);
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ var addr = new NetworkAddress("localhost", 8082);
+
+ assertThat(service.snapshot(new Peer(addr)),
willThrow(IgniteInternalException.class));
+ }
+
+ @Test
+ public void testSnapshotExecutionFailedResponse() {
+ mockSnapshotRequest(0);
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ var addr = new NetworkAddress("localhost", 8082);
+
+ assertThat(service.snapshot(new Peer(addr)),
willThrow(RaftException.class));
+ }
+
+ @Test
+ public void testRefreshMembers() {
+ List<String> respPeers = peersToIds(NODES.subList(0, 2));
+ List<String> respLearners = peersToIds(NODES.subList(2, 2));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+
eq(FACTORY.getPeersRequest().onlyAlive(false).groupId(TEST_GRP.toString()).build()),
anyLong()))
+ .then(invocation ->
+
completedFuture(FACTORY.getPeersResponse().peersList(respPeers).learnersList(respLearners).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES, true);
+
+ assertEquals(NODES, service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ assertThat(service.refreshMembers(false), willCompleteSuccessfully());
+
+ assertEquals(NODES.subList(0, 2), service.peers());
+ assertEquals(NODES.subList(2, 2), service.learners());
+ }
+
+ @Test
+ public void testAddPeer() {
+ List<String> respPeers = peersToIds(NODES);
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.addPeerRequest()
+ .peerId(PeerId.parsePeer(NODES.get(2).address().host()
+ ":" + NODES.get(2).address().port()).toString())
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
+ .then(invocation ->
+
completedFuture(FACTORY.addPeerResponse().newPeersList(respPeers).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 2),
true);
+
+ assertEquals(NODES.subList(0, 2), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ assertThat(service.addPeer(NODES.get(2)), willCompleteSuccessfully());
+
+ assertEquals(NODES, service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+ }
+
+ @Test
+ public void testRemovePeer() {
+ List<String> respPeers = peersToIds(NODES.subList(0, 2));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.removePeerRequest()
+ .peerId(PeerId.parsePeer(NODES.get(2).address().host()
+ ":" + NODES.get(2).address().port()).toString())
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
+ .then(invocation ->
+
completedFuture(FACTORY.removePeerResponse().newPeersList(respPeers).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES, true);
+
+ assertEquals(NODES, service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ assertThat(service.removePeer(NODES.get(2)),
willCompleteSuccessfully());
+
+ assertEquals(NODES.subList(0, 2), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+ }
+
+ @Test
+ public void testChangePeers() {
+ List<String> shrunkPeers = peersToIds(NODES.subList(0, 1));
+
+ List<String> extendedPeers = peersToIds(NODES);
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.changePeersRequest()
+ .newPeersList(shrunkPeers)
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
+ .then(invocation ->
+
completedFuture(FACTORY.changePeersResponse().newPeersList(shrunkPeers).build()));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.changePeersRequest()
+ .newPeersList(extendedPeers)
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
+ .then(invocation ->
+
completedFuture(FACTORY.changePeersResponse().newPeersList(extendedPeers).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 2),
true);
+
+ assertEquals(NODES.subList(0, 2), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ assertThat(service.changePeers(NODES.subList(0, 1)),
willCompleteSuccessfully());
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ assertThat(service.changePeers(NODES), willCompleteSuccessfully());
+
+ assertEquals(NODES, service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+ }
+
+ @Test
+ public void testTransferLeadership() {
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.transferLeaderRequest()
+ .peerId(PeerId.fromPeer(NODES.get(1)).toString())
+ .leaderId(PeerId.fromPeer(NODES.get(0)).toString())
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
+ .then(invocation ->
+
completedFuture(RaftRpcFactory.DEFAULT.newResponse(FACTORY, Status.OK())));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES, true);
+
+ assertEquals(NODES.get(0), service.leader());
+
+ assertThat(service.transferLeadership(NODES.get(1)),
willCompleteSuccessfully());
+
+ assertEquals(NODES.get(1), service.leader());
+ }
+
+ @Test
+ public void testAddLearners() {
+ List<String> addLearners = peersToIds(NODES.subList(1, 3));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.addLearnersRequest()
+ .learnersList(addLearners)
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
+ .then(invocation ->
+
completedFuture(FACTORY.learnersOpResponse().newLearnersList(addLearners).build()));
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 1),
true);
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(Collections.emptyList(), service.learners());
+
+ assertThat(service.addLearners(NODES.subList(1, 3)),
willCompleteSuccessfully());
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(1, 3), service.learners());
+ }
+
+ @Test
+ public void testResetLearners() {
+ List<String> addLearners = peersToIds(NODES.subList(1, 3));
+
+ List<String> resetLearners = peersToIds(NODES.subList(2, 3));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.resetLearnersRequest()
+ .learnersList(resetLearners)
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
+ .then(invocation ->
+
completedFuture(FACTORY.learnersOpResponse().newLearnersList(resetLearners).build()));
+
+ mockAddLearners(TEST_GRP.toString(), addLearners, addLearners);
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 1),
true);
+
+ assertThat(service.addLearners(NODES.subList(1, 3)),
willCompleteSuccessfully());
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(1, 3), service.learners());
+
+ assertThat(service.resetLearners(NODES.subList(2, 3)),
willCompleteSuccessfully());
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(2, 3), service.learners());
+ }
+
+ @Test
+ public void testRemoveLearners() {
+ List<String> addLearners = peersToIds(NODES.subList(1, 3));
+
+ List<String> removeLearners = peersToIds(NODES.subList(2, 3));
+
+ List<String> resultLearners = peersToIds(NODES.subList(1, 2));
+
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.removeLearnersRequest()
+ .learnersList(removeLearners)
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
+ .then(invocation ->
+
completedFuture(FACTORY.learnersOpResponse().newLearnersList(resultLearners).build()));
+
+ mockAddLearners(TEST_GRP.toString(), addLearners, addLearners);
+
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 1),
true);
+
+ assertThat(service.addLearners(NODES.subList(1, 3)),
willCompleteSuccessfully());
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(1, 3), service.learners());
+
+ assertThat(service.removeLearners(NODES.subList(2, 3)),
willCompleteSuccessfully());
+
+ assertEquals(NODES.subList(0, 1), service.peers());
+ assertEquals(NODES.subList(1, 2), service.learners());
+ }
+
+ @Test
+ public void testGetLeaderRequest() {
+ mockLeaderRequest(false);
+
+ RaftGroupService service = startRaftGroupService(NODES, false);
+
+ assertNull(service.leader());
+
+ assertThat(service.refreshLeader(), willCompleteSuccessfully());
+
+ GetLeaderRequest req =
FACTORY.getLeaderRequest().groupId(TEST_GRP.toString()).build();
+
+ CompletableFuture<GetLeaderResponse> fut =
messagingService.invoke(leader.address(), req, TIMEOUT)
+ .thenApply(GetLeaderResponse.class::cast);
+
+ assertThat(fut.thenApply(GetLeaderResponse::leaderId),
willBe(equalTo(PeerId.fromPeer(leader).toString())));
+ assertThat(fut.thenApply(GetLeaderResponse::currentTerm),
willBe(equalTo(CURRENT_TERM)));
+ }
+
+ private RaftGroupService startRaftGroupService(List<Peer> peers, boolean
getLeader) {
+ CompletableFuture<RaftGroupService> service =
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY,
TIMEOUT, peers, getLeader, DELAY, executor);
+
+ assertThat(service, willCompleteSuccessfully());
+
+ return service.join();
+ }
+
+ private RaftGroupService startRaftGroupService(List<Peer> peers, boolean
getLeader, int timeout) {
+ CompletableFuture<RaftGroupService> service =
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY,
timeout, peers, getLeader, DELAY, executor);
+
+ assertThat(service, willCompleteSuccessfully());
+
+ return service.join();
+ }
+
+ /**
+ * Mocks sending {@link ActionRequest}s.
+ *
+ * @param delay {@code True} to create a delay before response.
+ * @param peer Fail the request targeted to given peer.
+ */
+ private void mockUserInput(boolean delay, @Nullable Peer peer) {
+ when(messagingService.invoke(
+ any(NetworkAddress.class),
+ argThat(new ArgumentMatcher<ActionRequest>() {
+ @Override
+ public boolean matches(ActionRequest arg) {
+ return arg.command() instanceof TestCommand;
+ }
+ }),
+ anyLong()
+ )).then(invocation -> {
+ NetworkAddress target = invocation.getArgument(0);
+
+ if (peer != null && target.equals(peer.address())) {
+ return failedFuture(new ConnectException());
+ }
+
+ if (delay) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail();
+ }
+
+ return FACTORY.actionResponse().result(new
TestResponse()).build();
+ });
+ }
+
+ Object resp;
+
+ if (leader == null) {
+ resp =
FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build();
+ } else if (!target.equals(leader.address())) {
+ resp = FACTORY.errorResponse()
+
.errorCode(RaftError.EPERM.getNumber()).leaderId(PeerId.fromPeer(leader).toString()).build();
+ } else {
+ resp = FACTORY.actionResponse().result(new
TestResponse()).build();
+ }
+
+ return completedFuture(resp);
+ });
+ }
+
+ /**
+ * Mocks sending {@link GetLeaderRequest}s.
+ *
+ * @param delay {@code True} to delay response.
+ */
+ private void mockLeaderRequest(boolean delay) {
+ when(messagingService.invoke(any(NetworkAddress.class),
any(GetLeaderRequest.class), anyLong()))
+ .then(invocation -> {
+ if (delay) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail();
+ }
+
+ return
FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build();
+ });
+ }
+
+ PeerId leader0 = PeerId.fromPeer(leader);
+
+ Object resp = leader0 == null
+ ?
FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build()
+ :
FACTORY.getLeaderResponse().leaderId(leader0.toString()).currentTerm(CURRENT_TERM).build();
+
+ return completedFuture(resp);
+ });
+ }
+
+ private void mockSnapshotRequest(int mode) {
+ when(messagingService.invoke(any(NetworkAddress.class),
any(CliRequests.SnapshotRequest.class), anyLong()))
+ .then(invocation -> {
+ if (mode == 0) {
+ ErrorResponse response = FACTORY.errorResponse()
+ .errorCode(RaftError.UNKNOWN.getNumber())
+ .errorMsg("Failed to create a snapshot")
+ .build();
+
+ return completedFuture(response);
+ } else {
+ return failedFuture(new IgniteInternalException("Very
bad"));
+ }
+ });
+ }
+
+ private void mockAddLearners(String groupId, List<String> addLearners,
List<String> resultLearners) {
+ when(messagingService.invoke(any(NetworkAddress.class),
+ eq(FACTORY.addLearnersRequest()
+ .learnersList(addLearners)
+ .groupId(groupId).build()), anyLong()))
+ .then(invocation ->
+
completedFuture(FACTORY.learnersOpResponse().newLearnersList(resultLearners).build()));
+
+ }
+
+ private static class TestCommand implements WriteCommand {
+ }
+
+ private static class TestResponse {
+ }
+
+ /**
+ * Test replication group id.
+ */
+ private static class TestReplicationGroupId implements ReplicationGroupId {
+ private final String name;
+
+ TestReplicationGroupId(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestReplicationGroupId that = (TestReplicationGroupId) o;
+ return Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
deleted file mode 100644
index 61cc3d540e..0000000000
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
+++ /dev/null
@@ -1,883 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.raft.jraft.core;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.raft.jraft.test.TestUtils.peersToIds;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.when;
-
-import java.net.ConnectException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessagingService;
-import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.Status;
-import org.apache.ignite.raft.jraft.entity.PeerId;
-import org.apache.ignite.raft.jraft.error.RaftError;
-import org.apache.ignite.raft.jraft.rpc.ActionRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
-import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
-import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-/**
- * Test methods of raft group service.
- */
-@ExtendWith(MockitoExtension.class)
-public class RaftGroupServiceTest {
- /** The logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(RaftGroupServiceTest.class);
-
- /** */
- private static final List<Peer> NODES = Stream.of(20000, 20001, 20002)
- .map(port -> new NetworkAddress("localhost", port))
- .map(Peer::new)
- .collect(Collectors.toUnmodifiableList());
-
- /** */
- private static final RaftMessagesFactory FACTORY = new
RaftMessagesFactory();
-
- /** */
- private volatile Peer leader = NODES.get(0);
-
- /** Call timeout. */
- private static final int TIMEOUT = 1000;
-
- /** Retry delay. */
- private static final int DELAY = 200;
-
- /** Current term */
- private static final int CURRENT_TERM = 1;
-
- /** Test group id. */
- private static final TestReplicationGroupId TEST_GRP = new
TestReplicationGroupId("test");
-
- /** Mock cluster. */
- @Mock
- private ClusterService cluster;
-
- /** Mock messaging service */
- @Mock
- private MessagingService messagingService;
-
- /** Executor for raft group services. */
- private ScheduledExecutorService executor;
-
- /**
- * @param testInfo Test info.
- */
- @BeforeEach
- void before(TestInfo testInfo) {
- when(cluster.messagingService()).thenReturn(messagingService);
-
- executor = new ScheduledThreadPoolExecutor(20, new
NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
-
- LOG.info(">>>> Starting test {}",
testInfo.getTestMethod().orElseThrow().getName());
- }
-
- /**
- * Shutdown executor for raft group services.
- */
- @AfterEach
- void after() {
- IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRefreshLeaderStable() throws Exception {
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertNull(service.leader());
-
- service.refreshLeader().get();
-
- assertEquals(leader, service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRefreshLeaderNotElected() throws Exception {
- mockLeaderRequest(false);
-
- // Simulate running elections.
- leader = null;
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertNull(service.leader());
-
- try {
- service.refreshLeader().get();
-
- fail("Should fail");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRefreshLeaderElectedAfterDelay() throws Exception {
- mockLeaderRequest(false);
-
- // Simulate running elections.
- leader = null;
-
- Timer timer = new Timer();
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- leader = NODES.get(0);
- }
- }, 500);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertNull(service.leader());
-
- service.refreshLeader().get();
-
- assertEquals(NODES.get(0), service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRefreshLeaderWithTimeout() throws Exception {
- mockLeaderRequest(true);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- try {
- service.refreshLeader().get(500, TimeUnit.MILLISECONDS);
-
- fail();
- }
- catch (TimeoutException e) {
- // Expected.
- }
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderElected() throws Exception {
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- service.refreshLeader().get();
-
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLazyInitLeader() throws Exception {
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertNull(service.leader());
-
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
-
- assertEquals(leader, service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestWithTimeout() throws Exception {
- mockLeaderRequest(false);
- mockUserInput(true, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- try {
- service.run(new TestCommand()).get(500, TimeUnit.MILLISECONDS);
-
- fail();
- }
- catch (TimeoutException e) {
- // Expected.
- }
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderNotElected() throws Exception {
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- Peer leader = this.leader;
-
- assertEquals(leader, service.leader());
-
- this.leader = null;
-
- assertEquals(leader, service.leader());
-
- try {
- service.run(new TestCommand()).get();
-
- fail("Expecting timeout");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderElectedAfterDelay() throws Exception {
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- Peer leader = this.leader;
-
- assertEquals(leader, service.leader());
-
- this.leader = null;
-
- assertEquals(leader, service.leader());
-
- Timer timer = new Timer();
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- RaftGroupServiceTest.this.leader = NODES.get(0);
- }
- }, 500);
-
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
-
- assertEquals(NODES.get(0), service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderElectedAfterDelayWithFailedNode() throws
Exception {
- mockLeaderRequest(false);
- mockUserInput(false, NODES.get(0));
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT *
3, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- Peer leader = this.leader;
-
- assertEquals(leader, service.leader());
-
- this.leader = null;
-
- assertEquals(leader, service.leader());
-
- Timer timer = new Timer();
-
- timer.schedule(new TimerTask() {
- @Override public void run() {
- LOG.info("Set leader {}", NODES.get(1));
-
- RaftGroupServiceTest.this.leader = NODES.get(1);
- }
- }, 500);
-
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
-
- assertEquals(NODES.get(1), service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testUserRequestLeaderChanged() throws Exception {
- mockLeaderRequest(false);
- mockUserInput(false, null);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- Peer leader = this.leader;
-
- assertEquals(leader, service.leader());
-
- Peer newLeader = NODES.get(1);
-
- this.leader = newLeader;
-
- assertEquals(leader, service.leader());
- assertNotEquals(leader, newLeader);
-
- // Runs the command on an old leader. It should respond with leader
changed error, when transparently retry.
- TestResponse resp = service.<TestResponse>run(new TestCommand()).get();
-
- assertNotNull(resp);
-
- assertEquals(newLeader, service.leader());
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testSnapshotExecutionException() throws Exception {
- mockSnapshotRequest(1);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- var addr = new NetworkAddress("localhost", 8082);
-
- CompletableFuture<Void> fut = service.snapshot(new Peer(addr));
-
- try {
- fut.get();
-
- fail();
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof IgniteInternalException);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testSnapshotExecutionFailedResponse() throws Exception {
- mockSnapshotRequest(0);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- var addr = new NetworkAddress("localhost", 8082);
-
- CompletableFuture<Void> fut = service.snapshot(new Peer(addr));
-
- try {
- fut.get();
-
- fail();
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof RaftException);
- }
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRefreshMembers() throws Exception {
- List<String> respPeers = peersToIds(NODES.subList(0, 2));
- List<String> respLearners = peersToIds(NODES.subList(2, 2));
-
- when(messagingService.invoke(any(NetworkAddress.class),
-
eq(FACTORY.getPeersRequest().onlyAlive(false).groupId(TEST_GRP.toString()).build()),
anyLong()))
- .then(invocation ->
-
completedFuture(FACTORY.getPeersResponse().peersList(respPeers).learnersList(respLearners).build()));
-
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertEquals(NODES, service.peers());
- assertEquals(Collections.emptyList(), service.learners());
-
- service.refreshMembers(false).get(3, TimeUnit.SECONDS);
-
- assertEquals(NODES.subList(0, 2), service.peers());
- assertEquals(NODES.subList(2, 2), service.learners());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testAddPeer() throws Exception {
- List<String> respPeers = peersToIds(NODES);
-
- when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.addPeerRequest()
- .peerId(PeerId.parsePeer(NODES.get(2).address().host() + ":" +
NODES.get(2).address().port()).toString())
- .groupId(TEST_GRP.toString()).build()), anyLong()))
- .then(invocation ->
-
completedFuture(FACTORY.addPeerResponse().newPeersList(respPeers).build()));
-
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES.subList(0, 2), true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertEquals(NODES.subList(0, 2), service.peers());
- assertEquals(Collections.emptyList(), service.learners());
-
- service.addPeer(NODES.get(2)).get();
-
- assertEquals(NODES, service.peers());
- assertEquals(Collections.emptyList(), service.learners());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRemovePeer() throws Exception {
- List<String> respPeers = peersToIds(NODES.subList(0, 2));
-
- when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.removePeerRequest()
- .peerId(PeerId.parsePeer(NODES.get(2).address().host() + ":" +
NODES.get(2).address().port()).toString())
- .groupId(TEST_GRP.toString()).build()), anyLong()))
- .then(invocation ->
-
completedFuture(FACTORY.removePeerResponse().newPeersList(respPeers).build()));
-
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertEquals(NODES, service.peers());
- assertEquals(Collections.emptyList(), service.learners());
-
- service.removePeer(NODES.get(2)).get();
-
- assertEquals(NODES.subList(0, 2), service.peers());
- assertEquals(Collections.emptyList(), service.learners());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testChangePeers() throws Exception {
- List<String> shrunkPeers = peersToIds(NODES.subList(0, 1));
-
- List<String> extendedPeers = peersToIds(NODES);
-
- when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.changePeersRequest()
- .newPeersList(shrunkPeers)
- .groupId(TEST_GRP.toString()).build()), anyLong()))
- .then(invocation ->
-
completedFuture(FACTORY.changePeersResponse().newPeersList(shrunkPeers).build()));
-
- when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.changePeersRequest()
- .newPeersList(extendedPeers)
- .groupId(TEST_GRP.toString()).build()), anyLong()))
- .then(invocation ->
-
completedFuture(FACTORY.changePeersResponse().newPeersList(extendedPeers).build()));
-
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES.subList(0, 2), true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertEquals(NODES.subList(0, 2), service.peers());
- assertEquals(Collections.emptyList(), service.learners());
-
- service.changePeers(NODES.subList(0, 1)).get();
-
- assertEquals(NODES.subList(0, 1), service.peers());
- assertEquals(Collections.emptyList(), service.learners());
-
- service.changePeers(NODES).get();
-
- assertEquals(NODES, service.peers());
- assertEquals(Collections.emptyList(), service.learners());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testTransferLeadership() throws Exception {
- when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.transferLeaderRequest()
- .peerId(PeerId.fromPeer(NODES.get(1)).toString())
- .leaderId(PeerId.fromPeer(NODES.get(0)).toString())
- .groupId(TEST_GRP.toString()).build()), anyLong()))
- .then(invocation ->
- completedFuture(RaftRpcFactory.DEFAULT.newResponse(FACTORY,
Status.OK())));
-
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertEquals(NODES.get(0), service.leader());
-
- service.transferLeadership(NODES.get(1)).get();
-
- assertEquals(NODES.get(1), service.leader());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testAddLearners() throws Exception {
- List<String> addLearners = peersToIds(NODES.subList(1, 3));
-
- when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.addLearnersRequest()
- .learnersList(addLearners)
- .groupId(TEST_GRP.toString()).build()), anyLong()))
- .then(invocation ->
-
completedFuture(FACTORY.learnersOpResponse().newLearnersList(addLearners).build()));
-
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES.subList(0, 1), true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertEquals(NODES.subList(0, 1), service.peers());
- assertEquals(Collections.emptyList(), service.learners());
-
- service.addLearners(NODES.subList(1, 3)).get();
-
- assertEquals(NODES.subList(0, 1), service.peers());
- assertEquals(NODES.subList(1, 3), service.learners());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testResetLearners() throws Exception {
- List<String> addLearners = peersToIds(NODES.subList(1, 3));
-
- List<String> resetLearners = peersToIds(NODES.subList(2, 3));
-
- when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.resetLearnersRequest()
- .learnersList(resetLearners)
- .groupId(TEST_GRP.toString()).build()), anyLong()))
- .then(invocation ->
-
completedFuture(FACTORY.learnersOpResponse().newLearnersList(resetLearners).build()));
-
- mockAddLearners(TEST_GRP.toString(), addLearners, addLearners);
-
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES.subList(0, 1), true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- service.addLearners(NODES.subList(1, 3)).get();
-
- assertEquals(NODES.subList(0, 1), service.peers());
- assertEquals(NODES.subList(1, 3), service.learners());
-
- service.resetLearners(NODES.subList(2, 3)).get();
-
- assertEquals(NODES.subList(0, 1), service.peers());
- assertEquals(NODES.subList(2, 3), service.learners());
- }
-
- /**
- * @throws Exception
- */
- @Test
- public void testRemoveLearners() throws Exception {
- List<String> addLearners = peersToIds(NODES.subList(1, 3));
-
- List<String> removeLearners = peersToIds(NODES.subList(2, 3));
-
- List<String> resultLearners =
- NODES.subList(1, 2).stream().map(p ->
PeerId.fromPeer(p).toString()).collect(Collectors.toList());
-
- when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.removeLearnersRequest()
- .learnersList(removeLearners)
- .groupId(TEST_GRP.toString()).build()), anyLong()))
- .then(invocation ->
-
completedFuture(FACTORY.learnersOpResponse().newLearnersList(resultLearners).build()));
-
- mockAddLearners(TEST_GRP.toString(), addLearners, addLearners);
-
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT,
NODES.subList(0, 1), true, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- service.addLearners(NODES.subList(1, 3)).get();
-
- assertEquals(NODES.subList(0, 1), service.peers());
- assertEquals(NODES.subList(1, 3), service.learners());
-
- service.removeLearners(NODES.subList(2, 3)).get();
-
- assertEquals(NODES.subList(0, 1), service.peers());
- assertEquals(NODES.subList(1, 2), service.learners());
- }
-
- /** */
- @Test
- public void testGetLeaderRequest() throws Exception {
- mockLeaderRequest(false);
-
- RaftGroupService service =
- RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY,
TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
-
- assertNull(service.leader());
-
- service.refreshLeader().get();
-
- GetLeaderRequest req =
FACTORY.getLeaderRequest().groupId(TEST_GRP.toString()).build();
-
- GetLeaderResponse fut = (GetLeaderResponse)
messagingService.invoke(leader.address(), req, TIMEOUT).get();
-
- assertEquals(fut.leaderId(), PeerId.fromPeer(leader).toString());
-
- assertEquals(fut.currentTerm(), CURRENT_TERM);
- }
-
- /**
- * @param delay {@code True} to create a delay before response.
- * @param peer Fail the request targeted to given peer.
- */
- private void mockUserInput(boolean delay, @Nullable Peer peer) {
- when(messagingService.invoke(
- any(NetworkAddress.class),
- argThat(new ArgumentMatcher<ActionRequest>() {
- @Override public boolean matches(ActionRequest arg) {
- return arg.command() instanceof TestCommand;
- }
- }),
- anyLong()
- )).then(invocation -> {
- NetworkAddress target = invocation.getArgument(0);
-
- if (peer != null && target.equals(peer.address()))
- return failedFuture(new ConnectException());
-
- if (delay) {
- return CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {
- fail();
- }
-
- return FACTORY.actionResponse().result(new
TestResponse()).build();
- });
- }
-
- Object resp;
-
- if (leader == null)
- resp =
FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build();
- else if (!target.equals(leader.address()))
- resp = FACTORY.errorResponse()
-
.errorCode(RaftError.EPERM.getNumber()).leaderId(PeerId.fromPeer(leader).toString()).build();
- else
- resp = FACTORY.actionResponse().result(new
TestResponse()).build();
-
- return completedFuture(resp);
- });
- }
-
- /**
- * @param delay {@code True} to delay response.
- */
- private void mockLeaderRequest(boolean delay) {
- when(messagingService.invoke(any(NetworkAddress.class),
any(CliRequests.GetLeaderRequest.class), anyLong()))
- .then(invocation -> {
- if (delay) {
- return CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {
- fail();
- }
-
- return
FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build();
- });
- }
-
- PeerId leader0 = PeerId.fromPeer(leader);
-
- Object resp = leader0 == null ?
-
FACTORY.errorResponse().errorCode(RaftError.EPERM.getNumber()).build() :
-
FACTORY.getLeaderResponse().leaderId(leader0.toString()).currentTerm(CURRENT_TERM).build();
-
- return completedFuture(resp);
- });
- }
-
- /**
- * @param mode Mock mode.
- */
- private void mockSnapshotRequest(int mode) {
- when(messagingService.invoke(any(NetworkAddress.class),
any(CliRequests.SnapshotRequest.class), anyLong()))
- .then(invocation -> {
- if (mode == 0) {
- return
completedFuture(FACTORY.errorResponse().errorCode(RaftError.UNKNOWN.getNumber()).
- errorMsg("Failed to create a snapshot").build());
- }
- else
- return failedFuture(new IgniteInternalException("Very
bad"));
- });
- }
-
- /** */
- private void mockAddLearners(String groupId, List<String> addLearners,
List<String> resultLearners) {
- when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.addLearnersRequest()
- .learnersList(addLearners)
- .groupId(groupId).build()), anyLong()))
- .then(invocation ->
-
completedFuture(FACTORY.learnersOpResponse().newLearnersList(resultLearners).build()));
-
- }
-
- /** */
- private static class TestCommand implements WriteCommand {
- }
-
- /** */
- private static class TestResponse {
- }
-
- /**
- * Test replication group id.
- */
- protected static class TestReplicationGroupId implements
ReplicationGroupId {
- private final String name;
-
- public TestReplicationGroupId(String name) {
- this.name = name;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- TestReplicationGroupId that = (TestReplicationGroupId) o;
- return Objects.equals(name, that.name);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name);
- }
-
- @Override
- public String toString() {
- return name;
- }
- }
-}
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
index 144cfa857c..9b9766cb28 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
@@ -36,6 +36,7 @@ import java.util.stream.IntStream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -47,7 +48,6 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index f87176edd9..9de76dcf62 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
@@ -97,7 +98,6 @@ import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.utils.ClusterServiceTestUtils;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index cd6e02304e..0e87f92eac 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -122,7 +123,6 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.CliRequests;
-import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;