This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 84b760b54c IGNITE-22315 Make raft-client starting only once and only
with raft-client and replica together (#3956)
84b760b54c is described below
commit 84b760b54cbfe9ef5842b3b470a7e6f4be4ecdba
Author: Mikhail Efremov <[email protected]>
AuthorDate: Tue Jul 9 11:45:44 2024 +0600
IGNITE-22315 Make raft-client starting only once and only with raft-client
and replica together (#3956)
---
.../ignite/internal/index/ItBuildIndexTest.java | 10 +-
.../metastorage/server/time/ClusterTime.java | 5 +
.../metastorage/server/time/ClusterTimeImpl.java | 10 +-
.../PartitionReplicaLifecycleManager.java | 4 +-
.../ignite/internal/replicator/ReplicaManager.java | 61 ++--
.../app/ItIgniteInMemoryNodeRestartTest.java | 4 +-
.../rebalance/ItRebalanceDistributedTest.java | 116 +++++--
.../internal/table/distributed/TableManager.java | 346 ++++++++++++---------
.../distributed/TableManagerRecoveryTest.java | 3 -
.../table/distributed/TableManagerTest.java | 3 -
.../apache/ignite/distributed/ItTxTestCluster.java | 5 -
11 files changed, 341 insertions(+), 226 deletions(-)
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
index 64ec13d0de..e873d3e895 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.network.NetworkMessage;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
@@ -324,7 +325,14 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
assertNotNull(indexDescriptor);
for (int partitionId = 0; partitionId <
internalTable.partitions(); partitionId++) {
- RaftGroupService raftGroupService =
internalTable.tableRaftService().partitionRaftGroupService(partitionId);
+ // Excluding partitions on the node outside of replication
group
+ // TODO: will be replaced with replica usage in
https://issues.apache.org/jira/browse/IGNITE-22218
+ RaftGroupService raftGroupService;
+ try {
+ raftGroupService =
internalTable.tableRaftService().partitionRaftGroupService(partitionId);
+ } catch (IgniteInternalException e) {
+ continue;
+ }
List<Peer> allPeers = raftGroupService.peers();
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
index 55d662bbfa..4dee69f665 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java
@@ -34,6 +34,11 @@ public interface ClusterTime {
*/
long nowLong();
+ /**
+ * Returns current safe time.
+ */
+ HybridTimestamp currentSafeTime();
+
/**
* Provides the future that is completed when cluster time reaches given
one. If the time is greater or equal
* then the given one, returns completed future.
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
index af74dcde63..9277d40bf0 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
/**
* Cluster time implementation with additional methods to adjust time and
update safe time.
@@ -138,6 +137,11 @@ public class ClusterTimeImpl implements ClusterTime,
MetaStorageMetrics, Manuall
return clock.nowLong();
}
+ @Override
+ public HybridTimestamp currentSafeTime() {
+ return this.safeTime.current();
+ }
+
@Override
public CompletableFuture<Void> waitFor(HybridTimestamp time) {
return safeTime.waitFor(time);
@@ -233,10 +237,6 @@ public class ClusterTimeImpl implements ClusterTime,
MetaStorageMetrics, Manuall
IgniteUtils.shutdownAndAwaitTermination(executorService, 10,
TimeUnit.SECONDS);
}
- }
- @TestOnly
- public HybridTimestamp currentSafeTime() {
- return this.safeTime.current();
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 8f7cd70732..c02d26f89c 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -631,7 +631,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
) {
// Update raft client peers and learners according to the actual
assignments.
if (replicaMgr.isReplicaStarted(zonePartitionId)) {
- replicaMgr.getReplica(zonePartitionId).join()
+ replicaMgr.replica(zonePartitionId).join()
.raftClient().updateConfiguration(fromAssignments(stableAssignments));
}
@@ -785,7 +785,7 @@ public class PartitionReplicaLifecycleManager implements
IgniteComponent {
? pendingAssignmentsNodes
: RebalanceUtil.union(pendingAssignmentsNodes,
stableAssignments.nodes());
-
replicaMgr.getReplica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
+
replicaMgr.replica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments));
}, ioExecutor);
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index ca37e69f04..fd3d919f64 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -111,6 +111,7 @@ import
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.thread.ExecutorChooser;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.PublicApiThreading;
import org.apache.ignite.internal.thread.ThreadAttributes;
@@ -200,6 +201,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
private final ReplicaStateManager replicaStateManager;
+ private final ExecutorService replicasCreationExecutor;
+
private volatile String localNodeId;
private volatile String localNodeConsistentId;
@@ -331,6 +334,15 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
new LinkedBlockingQueue<>(),
NamedThreadFactory.create(nodeName, "replica", LOG)
);
+
+ replicasCreationExecutor = new ThreadPoolExecutor(
+ threadCount,
+ threadCount,
+ 30,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ IgniteThreadFactory.create(nodeName, "replica-manager", LOG,
STORAGE_READ, STORAGE_WRITE)
+ );
}
private void onReplicaMessageReceived(NetworkMessage message, ClusterNode
sender, @Nullable Long correlationId) {
@@ -613,6 +625,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @param replicaGrpId Replication group id.
* @param storageIndexTracker Storage index tracker.
* @param newConfiguration A configuration for new raft group.
+ *
* @return Future that promises ready new replica when done.
*/
public CompletableFuture<Boolean> startReplica(
@@ -752,14 +765,14 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
) throws NodeStoppingException {
LOG.info("Replica is about to start [replicationGroupId={}].",
replicaGrpId);
- CompletableFuture<Boolean> resultFuture =
newRaftClientFut.thenAccept(updateTableRaftService)
- .thenApply((v) -> true);
-
- CompletableFuture<ReplicaListener> newReplicaListenerFut =
newRaftClientFut.thenApply(createListener);
-
- startReplica(replicaGrpId, storageIndexTracker, newReplicaListenerFut);
-
- return resultFuture;
+ return newRaftClientFut
+ .thenApplyAsync(raftClient -> {
+ // TODO: will be removed in
https://issues.apache.org/jira/browse/IGNITE-22218
+ updateTableRaftService.accept(raftClient);
+ return createListener.apply(raftClient);
+ }, replicasCreationExecutor)
+ .thenCompose(replicaListener -> startReplica(replicaGrpId,
storageIndexTracker, completedFuture(replicaListener)))
+ .thenApply(r -> true);
}
/**
@@ -777,7 +790,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
ReplicationGroupId replicaGrpId,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
CompletableFuture<ReplicaListener> newReplicaListenerFut
- ) throws NodeStoppingException {
+ ) {
ClusterNode localNode = clusterNetSvc.topologyService().localMember();
@@ -820,30 +833,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
.thenCompose(v -> replicaFuture);
}
- /**
- * Temporary public method for RAFT-client starting.
- * TODO: will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
- *
- * @param replicaGrpId Replication Group ID.
- * @param newConfiguration Peers and learners nodes for a raft group.
- * @param raftClientCache Temporal supplier that returns RAFT-client from
TableRaftService if it's already exists and was put into the
- * service's map.
- * @return Future that returns started RAFT-client.
- * @throws NodeStoppingException In case if node was stopping.
- */
- @Deprecated
- public CompletableFuture<TopologyAwareRaftGroupService> startRaftClient(
- ReplicationGroupId replicaGrpId,
- PeersAndLearners newConfiguration,
- Supplier<RaftGroupService> raftClientCache)
- throws NodeStoppingException {
- RaftGroupService cachedRaftClient = raftClientCache.get();
- return cachedRaftClient != null
- ? completedFuture((TopologyAwareRaftGroupService)
cachedRaftClient)
- // TODO IGNITE-19614 This procedure takes 10 seconds if
there's no majority online.
- : raftManager.startRaftGroupService(replicaGrpId,
newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller);
- }
-
/**
* Returns future with a replica if it was created or null if there no any
replicas starting with given identifier.
*
@@ -1021,6 +1010,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS);
shutdownAndAwaitTermination(executor, shutdownTimeoutSeconds,
TimeUnit.SECONDS);
+ shutdownAndAwaitTermination(replicasCreationExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS);
assert replicas.values().stream().noneMatch(CompletableFuture::isDone)
: "There are replicas alive [replicas="
@@ -1219,11 +1209,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
return replicas.containsKey(replicaGrpId);
}
- @TestOnly
- public CompletableFuture<Replica> getReplica(ReplicationGroupId
replicationGroupId) {
- return replicas.get(replicationGroupId);
- }
-
/**
* Returns started replication groups.
*
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 378b2cb681..4e479fbe90 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -219,7 +219,9 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
List<String> partitionAssignments = assignments.get(0);
- return partitionAssignments.contains(restartingNodeConsistentId);
+ return !assignments.isEmpty()
+ && partitionAssignments != null
+ && partitionAssignments.contains(restartingNodeConsistentId);
}
private static boolean isRaftNodeStarted(TableViewInternal table, Loza
loza) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index b48222de3c..8825e5ddf0 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -120,6 +120,7 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
+import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
@@ -160,12 +161,14 @@ import org.apache.ignite.internal.raft.RaftNodeId;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.rest.configuration.RestConfiguration;
@@ -707,19 +710,23 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
));
// Check that raft clients on all nodes were updated with the new list
of peers.
+ Predicate<Node> isNodeInReplicationGroup = n -> isNodeInAssignments(n,
newAssignment);
assertTrue(waitForCondition(
- () -> nodes.stream().allMatch(n ->
- n.tableManager
- .startedTables()
- .get(getTableId(node, TABLE_NAME))
- .internalTable()
- .tableRaftService()
- .partitionRaftGroupService(0)
- .peers()
- .equals(List.of(new
Peer(newNodeNameForAssignment)))),
+ () -> nodes.stream()
+ .filter(isNodeInReplicationGroup)
+ .allMatch(n -> isNodeUpdatesPeersOnGroupService(node,
assignmentsToPeersSet(newAssignment))),
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
+ // Checks that there no any replicas outside replication group
+ var replGrpId = new TablePartitionId(getTableId(node, TABLE_NAME), 0);
+ Predicate<Node> isNodeOutsideReplicationGroup = n ->
!isNodeInAssignments(n, newAssignment);
+ assertTrue(waitForCondition(
+ () -> nodes.stream()
+ .filter(isNodeOutsideReplicationGroup)
+ .noneMatch(n -> isReplicationGroupStarted(n,
replGrpId)),
+ (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
+ ));
}
@@ -733,13 +740,14 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
waitPartitionAssignmentsSyncedToExpected(0, 1);
- String assignmentsBeforeRebalance = getPartitionClusterNodes(node,
0).stream()
+ var assignmentsBeforeRebalance = getPartitionClusterNodes(node, 0);
+ String nodeNameAssignedBeforeRebalance =
assignmentsBeforeRebalance.stream()
.findFirst()
.orElseThrow()
.consistentId();
String newNodeNameForAssignment = nodes.stream()
- .filter(n ->
!assignmentsBeforeRebalance.equals(n.clusterService.nodeName()))
+ .filter(n ->
!nodeNameAssignedBeforeRebalance.equals(n.clusterService.nodeName()))
.findFirst()
.orElseThrow()
.name;
@@ -762,25 +770,65 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
node.metaStorageManager.put(partAssignmentsPendingKey,
bytesPendingAssignments).get(AWAIT_TIMEOUT_MILLIS, MILLISECONDS);
+ Set<Assignment> union =
RebalanceUtil.union(assignmentsBeforeRebalance, newAssignment);
+
// Check that raft clients on all nodes were updated with the new list
of peers.
+ Predicate<Node> isNodeInReplicationGroup = n -> isNodeInAssignments(n,
union);
assertTrue(waitForCondition(
- () -> nodes.stream().allMatch(n ->
- n.tableManager
- .startedTables()
- .get(getTableId(node, TABLE_NAME))
- .internalTable()
- .tableRaftService()
- .partitionRaftGroupService(0)
- .peers()
- .stream()
- .collect(toSet())
- .equals(Set.of(new
Peer(newNodeNameForAssignment), new Peer(assignmentsBeforeRebalance)))),
+ () -> nodes.stream()
+ .filter(isNodeInReplicationGroup)
+ .allMatch(n -> isNodeUpdatesPeersOnGroupService(node,
assignmentsToPeersSet(union))),
+ (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
+ ));
+
+ // Checks that there no any replicas outside replication group
+ Predicate<Node> isNodeOutsideReplicationGroup = n ->
!isNodeInAssignments(n, union);
+ assertTrue(waitForCondition(
+ () -> nodes.stream()
+ .filter(isNodeOutsideReplicationGroup)
+ .noneMatch(n -> isReplicationGroupStarted(n, partId)),
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
dropMessages.set(false);
}
+ private static Set<Peer> assignmentsToPeersSet(Set<Assignment>
assignments) {
+ return assignments.stream()
+ .map(Assignment::consistentId)
+ .map(Peer::new)
+ .collect(toSet());
+ }
+
+ private static boolean isNodeInAssignments(Node node, Set<Assignment>
assignments) {
+ return assignmentsToPeersSet(assignments).stream()
+ .map(Peer::consistentId)
+ .anyMatch(id -> id.equals(node.clusterService.nodeName()));
+ }
+
+ private static boolean isReplicationGroupStarted(Node node,
ReplicationGroupId replicationGroupId) {
+ return node.replicaManager.isReplicaStarted(replicationGroupId);
+ }
+
+ private static boolean isNodeUpdatesPeersOnGroupService(Node node,
Set<Peer> desiredPeers) {
+ // TODO: will be replaced with replica usage in
https://issues.apache.org/jira/browse/IGNITE-22218
+ TableRaftService tblRaftSvc = node.tableManager.startedTables()
+ .get(getTableId(node, TABLE_NAME))
+ .internalTable()
+ .tableRaftService();
+ RaftGroupService groupService;
+ try {
+ groupService = tblRaftSvc.partitionRaftGroupService(0);
+ } catch (IgniteInternalException e) {
+ return false;
+ }
+ List<Peer> peersList = groupService.peers();
+ boolean isUpdated = peersList.stream()
+ .collect(toSet())
+ .equals(desiredPeers);
+ return isUpdated;
+ }
+
private void clearSpyInvocations() {
for (int i = 0; i < NODE_COUNT; i++) {
clearInvocations(getNode(i).raftManager);
@@ -846,16 +894,26 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
(long) AWAIT_TIMEOUT_MILLIS * nodes.size()
));
+ Node anyNode = nodes.get(0);
+ Set<Assignment> assignments = getPartitionClusterNodes(anyNode,
tableName, replicasNum);
assertTrue(waitForCondition(
() -> {
try {
- return nodes.stream().allMatch(n ->
- n.tableManager
- .cachedTable(getTableId(n, tableName))
- .internalTable()
- .tableRaftService()
- .partitionRaftGroupService(partNum) !=
null
- );
+ return nodes.stream()
+ .filter(n -> isNodeInAssignments(n,
assignments))
+ .allMatch(n -> {
+ // TODO: will be replaced with replica
usage in https://issues.apache.org/jira/browse/IGNITE-22218
+ TableRaftService trs = n.tableManager
+ .cachedTable(getTableId(n,
tableName))
+ .internalTable()
+ .tableRaftService();
+
+ try {
+ return
trs.partitionRaftGroupService(partNum) != null;
+ } catch (IgniteInternalException e) {
+ return false;
+ }
+ });
} catch (IgniteInternalException e) {
// Raft group service not found.
return false;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 13406d7f77..9bad9f3241 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -41,6 +41,7 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablesCounterKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
@@ -99,6 +100,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.LongFunction;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -152,7 +154,6 @@ import
org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
-import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -161,6 +162,7 @@ import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import
org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReason;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.schema.SchemaManager;
@@ -413,6 +415,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final IndexMetaStorage indexMetaStorage;
+ private final Predicate<Assignment> isLocalNodeAssignment = assignment ->
assignment.consistentId().equals(localNode().name());
+
/**
* Creates a new table manager.
*
@@ -646,7 +650,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery(
stableAssignmentsPrefix,
recoveryRevision,
- (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev,
true),
+ (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev,
true),
"stable"
));
startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery(
@@ -889,6 +893,11 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
int zoneId,
boolean isRecovery
) {
+ if (localMemberAssignment == null) {
+ // (0) in case if node not in the assignments
+ return nullCompletedFuture();
+ }
+
int tableId = table.tableId();
var internalTbl = (InternalTableImpl) table.internalTable();
@@ -902,137 +911,103 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
.tableRaftService()
.updateInternalTableRaftGroupService(partId, raftClient);
- CompletableFuture<Boolean> startGroupFut;
-
- if (localMemberAssignment != null) {
- CompletableFuture<Boolean> shouldStartGroupFut = isRecovery
- ?
partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
- replicaGrpId,
- internalTbl,
- stablePeersAndLearners,
- localMemberAssignment
- )
- : trueCompletedFuture();
-
- Assignments forcedAssignments = stableAssignments.force() ?
stableAssignments : null;
-
- startGroupFut = replicaMgr.weakStartReplica(
- replicaGrpId,
- () -> shouldStartGroupFut.thenComposeAsync(startGroup ->
inBusyLock(busyLock, () -> {
- // (1) if
partitionReplicatorNodeRecovery#shouldStartGroup fails -> do start nothing
- if (!startGroup) {
- return falseCompletedFuture();
- }
+ CompletableFuture<Boolean> shouldStartGroupFut = isRecovery
+ ? partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
+ replicaGrpId,
+ internalTbl,
+ stablePeersAndLearners,
+ localMemberAssignment
+ )
+ : trueCompletedFuture();
- // (2) Otherwise let's start replica manually
- var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp,
Void>(HybridTimestamp.MIN_VALUE);
+ Assignments forcedAssignments = stableAssignments.force() ?
stableAssignments : null;
- var storageIndexTracker = new
PendingComparableValuesTracker<Long, Void>(0L);
+ Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () ->
shouldStartGroupFut
+ .thenComposeAsync(startGroup -> inBusyLock(busyLock, () -> {
+ // (1) if partitionReplicatorNodeRecovery#shouldStartGroup
fails -> do start nothing
+ if (!startGroup) {
+ return falseCompletedFuture();
+ }
- PartitionStorages partitionStorages =
getPartitionStorages(table, partId);
+ // (2) Otherwise let's start replica manually
+ var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp,
Void>(HybridTimestamp.MIN_VALUE);
- PartitionDataStorage partitionDataStorage =
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
- internalTbl, partId);
+ var storageIndexTracker = new
PendingComparableValuesTracker<Long, Void>(0L);
-
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
+ PartitionStorages partitionStorages =
getPartitionStorages(table, partId);
- PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
- partId,
- partitionDataStorage,
- table,
- safeTimeTracker,
- storageUpdateConfig
- );
+ PartitionDataStorage partitionDataStorage =
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+ internalTbl, partId);
- internalTbl.updatePartitionTrackers(partId,
safeTimeTracker, storageIndexTracker);
+
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
- mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
+ PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
+ partId,
+ partitionDataStorage,
+ table,
+ safeTimeTracker,
+ storageUpdateConfig
+ );
- RaftGroupListener raftGroupListener = new
PartitionListener(
- txManager,
- partitionDataStorage,
- partitionUpdateHandlers.storageUpdateHandler,
- partitionStorages.getTxStateStorage(),
- safeTimeTracker,
- storageIndexTracker,
- catalogService,
- table.schemaView(),
- clockService,
- indexMetaStorage
- );
+ internalTbl.updatePartitionTrackers(partId,
safeTimeTracker, storageIndexTracker);
+
+ mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
+
+ RaftGroupListener raftGroupListener = new
PartitionListener(
+ txManager,
+ partitionDataStorage,
+ partitionUpdateHandlers.storageUpdateHandler,
+ partitionStorages.getTxStateStorage(),
+ safeTimeTracker,
+ storageIndexTracker,
+ catalogService,
+ table.schemaView(),
+ clockService,
+ indexMetaStorage
+ );
- SnapshotStorageFactory snapshotStorageFactory =
createSnapshotStorageFactory(replicaGrpId,
- partitionUpdateHandlers, internalTbl);
+ SnapshotStorageFactory snapshotStorageFactory =
createSnapshotStorageFactory(replicaGrpId,
+ partitionUpdateHandlers, internalTbl);
- Function<RaftGroupService, ReplicaListener>
createListener = (raftClient) -> createReplicaListener(
- replicaGrpId,
- table,
- safeTimeTracker,
- partitionStorages.getMvPartitionStorage(),
- partitionStorages.getTxStateStorage(),
- partitionUpdateHandlers,
- raftClient);
-
- RaftGroupEventsListener raftGroupEventsListener =
createRaftGroupEventsListener(zoneId, replicaGrpId);
-
- MvTableStorage mvTableStorage = internalTbl.storage();
-
- try {
- return replicaMgr.startReplica(
- raftGroupEventsListener,
- raftGroupListener,
- mvTableStorage.isVolatile(),
- snapshotStorageFactory,
- updateTableRaftService,
- createListener,
- storageIndexTracker,
- replicaGrpId,
- stablePeersAndLearners);
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped before
Table manager", e);
- }
- }), ioExecutor),
- forcedAssignments
- );
- } else {
- // TODO: will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
- // (4) in case if node not in the assignments
- startGroupFut = falseCompletedFuture();
- }
+ Function<RaftGroupService, ReplicaListener> createListener
= (raftClient) -> createReplicaListener(
+ replicaGrpId,
+ table,
+ safeTimeTracker,
+ partitionStorages.getMvPartitionStorage(),
+ partitionStorages.getTxStateStorage(),
+ partitionUpdateHandlers,
+ raftClient);
- return startGroupFut
- // TODO: the stage will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
- .thenComposeAsync(isReplicaStarted -> inBusyLock(busyLock, ()
-> {
- if (isReplicaStarted) {
- return nullCompletedFuture();
- }
+ RaftGroupEventsListener raftGroupEventsListener =
createRaftGroupEventsListener(zoneId, replicaGrpId);
- // TODO: will be removed in
https://issues.apache.org/jira/browse/IGNITE-22315
- Supplier<RaftGroupService> getCachedRaftClient = () -> {
- try {
- // Return existing service if it's already started.
- return internalTbl
- .tableRaftService()
-
.partitionRaftGroupService(replicaGrpId.partitionId());
- } catch (IgniteInternalException e) {
- // We use "IgniteInternalException" in accordance
with the javadoc of "partitionRaftGroupService" method.
- return null;
- }
- };
+ MvTableStorage mvTableStorage = internalTbl.storage();
- CompletableFuture<TopologyAwareRaftGroupService>
newRaftClientFut;
try {
- newRaftClientFut =
replicaMgr.startRaftClient(replicaGrpId, stablePeersAndLearners,
getCachedRaftClient);
+ return replicaMgr.startReplica(
+ raftGroupEventsListener,
+ raftGroupListener,
+ mvTableStorage.isVolatile(),
+ snapshotStorageFactory,
+ updateTableRaftService,
+ createListener,
+ storageIndexTracker,
+ replicaGrpId,
+ stablePeersAndLearners);
} catch (NodeStoppingException e) {
- throw new CompletionException(e);
+ throw new AssertionError("Loza was stopped before
Table manager", e);
}
- return newRaftClientFut.thenAccept(updateTableRaftService);
- }), ioExecutor)
- .whenComplete((res, ex) -> {
- if (ex != null) {
- LOG.warn("Unable to update raft groups on the node
[tableId={}, partitionId={}]", ex, tableId, partId);
- }
- });
+ }), ioExecutor);
+
+ return replicaMgr.weakStartReplica(
+ replicaGrpId,
+ startReplicaSupplier,
+ forcedAssignments
+ ).handle((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Unable to update raft groups on the node
[tableId={}, partitionId={}]", ex, tableId, partId);
+ }
+ return null;
+ });
}
@Nullable
@@ -1110,6 +1085,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return peer.consistentId().equals(localNode().name());
}
+ private boolean isLocalNodeInAssignments(Collection<Assignment>
assignments) {
+ return assignments.stream().anyMatch(isLocalNodeAssignment);
+ }
+
private PartitionDataStorage partitionDataStorage(MvPartitionStorage
partitionStorage, InternalTable internalTbl, int partId) {
return new SnapshotAwarePartitionDataStorage(
partitionStorage,
@@ -1141,6 +1120,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
@Override
public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ // NB: busy lock had already gotten in {@link beforeNodeStop}
assert beforeStopGuard.get() : "'stop' called before 'beforeNodeStop'";
if (!stopGuard.compareAndSet(false, true)) {
@@ -1793,7 +1773,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
int catalogVersion =
catalogService.latestCatalogVersion();
return
setTablesPartitionCountersForRebalance(replicaGrpId, revision,
pendingAssignments.force(), catalogVersion)
- .thenCompose(r ->
handleChangePendingAssignmentEvent(
+ .thenCompose(v ->
handleChangePendingAssignmentEvent(
replicaGrpId,
table,
stableAssignments,
@@ -1802,7 +1782,20 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
isRecovery,
catalogVersion
))
- .thenCompose(v ->
changePeersOnRebalance(table, replicaGrpId, pendingAssignments.nodes(),
revision));
+ .thenCompose(v -> {
+ boolean isLocalNodeInStableOrPending =
isNodeInReducedStableOrPendingAssignments(
+ replicaGrpId,
+ stableAssignments,
+ pendingAssignments,
+ revision
+ );
+
+ if (!isLocalNodeInStableOrPending) {
+ return nullCompletedFuture();
+ }
+
+ return changePeersOnRebalance(table,
replicaGrpId, pendingAssignments.nodes(), revision);
+ });
} finally {
busyLock.leaveBusy();
}
@@ -1886,24 +1879,76 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}), ioExecutor);
} else {
localServicesStartFuture = runAsync(() -> {
- if (pendingAssignmentsAreForced &&
replicaMgr.isReplicaStarted(replicaGrpId)) {
+ if (pendingAssignmentsAreForced && localMemberAssignment !=
null) {
+
+ assert replicaMgr.isReplicaStarted(replicaGrpId) : "The
local node is outside of the replication group";
+
replicaMgr.resetPeers(replicaGrpId,
fromAssignments(computedStableAssignments.nodes()));
+ } else if (pendingAssignmentsAreForced &&
localMemberAssignment == null) {
+ assert !replicaMgr.isReplicaStarted(replicaGrpId)
+ : "The local node is inside of the replication
group";
}
}, ioExecutor);
}
- return localServicesStartFuture.thenRunAsync(() -> {
- // For forced assignments, we exclude dead stable nodes, and all
alive stable nodes are already in pending assignments.
- // Union is not required in such a case.
- Set<Assignment> newAssignments = pendingAssignmentsAreForced ||
stableAssignments == null
- ? pendingAssignmentsNodes
- : union(pendingAssignmentsNodes,
stableAssignments.nodes());
+ return localServicesStartFuture
+ .thenComposeAsync(v -> inBusyLock(busyLock, () ->
isLocalNodeLeaseholder(replicaGrpId)), ioExecutor)
+ .thenAcceptAsync(isLeaseholder -> inBusyLock(busyLock, () -> {
+ boolean isLocalNodeInStableOrPending =
isNodeInReducedStableOrPendingAssignments(
+ replicaGrpId,
+ stableAssignments,
+ pendingAssignments,
+ revision
+ );
- tbl.internalTable()
- .tableRaftService()
- .partitionRaftGroupService(partitionId)
- .updateConfiguration(fromAssignments(newAssignments));
- }, ioExecutor);
+ if (!isLocalNodeInStableOrPending && !isLeaseholder) {
+ return;
+ }
+
+ assert isLocalNodeInStableOrPending || isLeaseholder
+ : "The local node is outside of the replication
group [inStableOrPending=" + isLocalNodeInStableOrPending
+ + ", isLeaseholder=" + isLeaseholder + "].";
+
+ // For forced assignments, we exclude dead stable nodes,
and all alive stable nodes are already in pending assignments.
+ // Union is not required in such a case.
+ Set<Assignment> newAssignments =
pendingAssignmentsAreForced || stableAssignments == null
+ ? pendingAssignmentsNodes
+ : union(pendingAssignmentsNodes,
stableAssignments.nodes());
+
+ tbl.internalTable()
+ .tableRaftService()
+ .partitionRaftGroupService(partitionId)
+
.updateConfiguration(fromAssignments(newAssignments));
+ }), ioExecutor);
+ }
+
+ private boolean isNodeInReducedStableOrPendingAssignments(
+ TablePartitionId replicaGrpId,
+ @Nullable Assignments stableAssignments,
+ Assignments pendingAssignments,
+ long revision
+ ) {
+ Entry reduceEntry =
metaStorageMgr.getLocally(RebalanceUtil.switchReduceKey(replicaGrpId),
revision);
+
+ Assignments reduceAssignments = reduceEntry != null
+ ? Assignments.fromBytes(reduceEntry.value())
+ : null;
+
+ Set<Assignment> reducedStableAssignments = reduceAssignments != null
+ ? subtract(stableAssignments.nodes(),
reduceAssignments.nodes())
+ : stableAssignments.nodes();
+
+ if (!isLocalNodeInAssignments(union(reducedStableAssignments,
pendingAssignments.nodes()))) {
+ return false;
+ }
+
+ assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local node is
outside of the replication group ["
+ + ", stable=" + stableAssignments
+ + ", pending=" + pendingAssignments
+ + ", reduce=" + reduceAssignments
+ + ", localName=" + localNode().name() + "].";
+
+ return true;
}
private CompletableFuture<Void> setTablesPartitionCountersForRebalance(
@@ -1966,6 +2011,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
private CompletableFuture<Void> changePeersOnRebalance(
+ // TODO: remove excessive argument (used to get raft-client)
https://issues.apache.org/jira/browse/IGNITE-22218
TableImpl table,
TablePartitionId replicaGrpId,
Set<Assignment> pendingAssignments,
@@ -1999,7 +2045,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
// run update of raft configuration if this node is a
leader
LOG.info("Current node={} is the leader of partition raft
group={}. "
+ "Initiate rebalance process for
partition={}, table={}",
- leaderWithTerm.leader(), replicaGrpId, partId,
table.name());
+ leaderWithTerm.leader(), replicaGrpId, partId,
tables.get(replicaGrpId.tableId()).name());
return
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
.thenCompose(latestPendingAssignmentsEntry -> {
@@ -2010,8 +2056,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return nullCompletedFuture();
}
- PeersAndLearners newConfiguration =
- fromAssignments(pendingAssignments);
+ PeersAndLearners newConfiguration =
fromAssignments(pendingAssignments);
return
partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term());
});
@@ -2244,18 +2289,41 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}, ioExecutor).thenCompose(identity());
}
+ private CompletableFuture<Boolean>
isLocalNodeLeaseholder(ReplicationGroupId replicationGroupId) {
+ HybridTimestamp previousMetastoreSafeTime =
metaStorageMgr.clusterTime()
+ .currentSafeTime()
+ .addPhysicalTime(-clockService.maxClockSkewMillis());
+
+ return
executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId,
previousMetastoreSafeTime)
+ .thenApply(replicaMeta -> replicaMeta != null
+ && replicaMeta.getLeaseholderId() != null
+ &&
replicaMeta.getLeaseholderId().equals(localNode().name()));
+ }
+
private CompletableFuture<Void> updatePartitionClients(
TablePartitionId tablePartitionId,
Set<Assignment> stableAssignments,
long revision
) {
- // Update raft client peers and learners according to the actual
assignments.
- return tablesById(revision).thenAccept(t -> {
- t.get(tablePartitionId.tableId()).internalTable()
+ return
isLocalNodeLeaseholder(tablePartitionId).thenCompose(isLeaseholder ->
inBusyLock(busyLock, () -> {
+ boolean isLocalInStable =
isLocalNodeInAssignments(stableAssignments);
+
+ if (!isLocalInStable && !isLeaseholder) {
+ return nullCompletedFuture();
+ }
+
+ assert replicaMgr.isReplicaStarted(tablePartitionId)
+ : "The local node is outside of the replication group
[inStable=" + isLocalInStable
+ + ", isLeaseholder=" + isLeaseholder + "].";
+
+ // Update raft client peers and learners according to the actual
assignments.
+ return tablesById(revision).thenAccept(t ->
t.get(tablePartitionId.tableId())
+ .internalTable()
.tableRaftService()
.partitionRaftGroupService(tablePartitionId.partitionId())
- .updateConfiguration(fromAssignments(stableAssignments));
- });
+ .updateConfiguration(fromAssignments(stableAssignments))
+ );
+ }));
}
private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
@@ -2274,7 +2342,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
? pendingAssignments.nodes().stream()
: Stream.concat(stableAssignments.stream(),
pendingAssignments.nodes().stream())
)
- .noneMatch(assignment ->
assignment.consistentId().equals(localNode().name()));
+ .noneMatch(isLocalNodeAssignment);
if (shouldStopLocalServices) {
return allOf(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 250f56c51b..a4127f5c7c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -281,9 +281,6 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
when(replicaMgr.getLogSyncer()).thenReturn(mock(LogSyncer.class));
when(replicaMgr.startReplica(any(), any(), any(), any(),
any(PendingComparableValuesTracker.class), any()))
.thenReturn(nullCompletedFuture());
- // TODO: will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
- when(replicaMgr.startRaftClient(any(), any(), any()))
-
.thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class)));
when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
when(replicaMgr.weakStartReplica(any(), any(),
any())).thenReturn(trueCompletedFuture());
when(replicaMgr.weakStopReplica(any(), any(),
any())).thenReturn(nullCompletedFuture());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index df39756946..ecb6c2016b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -282,9 +282,6 @@ public class TableManagerTest extends IgniteAbstractTest {
when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(),
any(), any(), any(), any()))
.thenReturn(trueCompletedFuture());
- // TODO: will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
- when(replicaMgr.startRaftClient(any(), any(), any()))
-
.thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class)));
when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture());
when(replicaMgr.weakStartReplica(any(), any(), any())).thenAnswer(inv
-> {
Supplier<CompletableFuture<Void>> startOperation =
inv.getArgument(1);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 7ec6ac9c5b..f1379fee24 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -32,7 +32,6 @@ import static
org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
@@ -682,7 +681,6 @@ public class ItTxTestCluster {
topologyAwareRaftGroupServiceFactory
).thenAccept(
raftSvc -> {
- try {
PartitionReplicaListener listener =
newReplicaListener(
mvPartStorage,
raftSvc,
@@ -713,9 +711,6 @@ public class ItTxTestCluster {
storageIndexTracker,
completedFuture(listener)
);
- } catch (NodeStoppingException e) {
- fail("Unexpected node stopping", e);
- }
}
);