This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7bcea31c9e IGNITE-18692 ItRebalanceTest fix (#2157)
7bcea31c9e is described below

commit 7bcea31c9eb6350120584c1ca060131504927d04
Author: Kirill Gusakov <[email protected]>
AuthorDate: Tue Jun 13 15:18:57 2023 +0300

    IGNITE-18692 ItRebalanceTest fix (#2157)
---
 .../distributionzones/rebalance/RebalanceUtil.java |  42 ++++++
 .../ItPlacementDriverReplicaSideTest.java          |   2 +-
 .../raft/client/TopologyAwareRaftGroupService.java |   9 +-
 .../apache/ignite/internal/replicator/Replica.java |   3 +-
 .../ignite/internal/replicator/ReplicaManager.java |  21 ++-
 .../java/org/apache/ignite/internal/Cluster.java   |   4 +
 .../storage/ItRebalanceDistributedTest.java        |   3 +-
 .../ignite/internal/rebalance/ItRebalanceTest.java | 161 ++++++++-------------
 .../sql/engine/exec/MockedStructuresTest.java      |   8 +-
 .../distributed/ItTxDistributedTestSingleNode.java |   2 +-
 .../ignite/distributed/ReplicaUnavailableTest.java |   2 +-
 .../internal/table/distributed/TableManager.java   |  48 ++++--
 .../table/distributed/TableManagerTest.java        |   5 +-
 13 files changed, 183 insertions(+), 127 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 1fa1235894..0b1bd58d5a 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -46,6 +47,8 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.jetbrains.annotations.NotNull;
 
@@ -397,6 +400,22 @@ public class RebalanceUtil {
                 .thenApply(e -> (e.value() == null) ? null : 
ByteUtils.fromBytes(e.value()));
     }
 
+    /**
+     * Returns partition assignments from vault.
+     *
+     * @param vaultManager Vault manager.
+     * @param tableId Table id.
+     * @param partitionNumber Partition number.
+     * @return Returns partition assignments from vault or {@code null} if 
assignments is absent.
+     */
+    public static Set<Assignment> partitionAssignments(
+            VaultManager vaultManager, int tableId, int partitionNumber) {
+        VaultEntry entry =
+                vaultManager.get(stablePartAssignmentsKey(new 
TablePartitionId(tableId, partitionNumber))).join();
+
+        return (entry == null) ? null : ByteUtils.fromBytes(entry.value());
+    }
+
     /**
      * Returns table assignments for all table partitions from meta storage.
      *
@@ -431,4 +450,27 @@ public class RebalanceUtil {
                     return Arrays.asList(result);
                 });
     }
+
+    /**
+     * Returns table assignments for all table partitions from vault.
+     *
+     * @param vaultManager Vault manager.
+     * @param tableId Table id.
+     * @param numberOfPartitions Number of partitions.
+     * @return Future with table assignments as a value.
+     */
+    public static List<Set<Assignment>> tableAssignments(
+            VaultManager vaultManager,
+            int tableId,
+            int numberOfPartitions
+    ) {
+        return IntStream.range(0, numberOfPartitions)
+                .mapToObj(i ->
+                        (Set<Assignment>) ByteUtils.fromBytes(
+                                vaultManager.get(
+                                        stablePartAssignmentsKey(new 
TablePartitionId(tableId, i))
+                                ).join().value())
+                )
+                .collect(Collectors.toList());
+    }
 }
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 1d79971912..03ffee72b6 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -394,7 +394,7 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
             assertNotNull(raftManager);
             assertNotNull(replicaManager);
 
-            replicaManager.stopReplica(testGrpId);
+            replicaManager.stopReplica(testGrpId).join();
             raftManager.stopRaftNodes(testGrpId);
         }
     }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 2da893f7f4..6831025ca1 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.raft.client;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -314,7 +315,7 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
         serverEventHandler.resetLeader();
 
         var peers = peers();
-        var futs = new CompletableFuture[peers.size()];
+        List<CompletableFuture<Boolean>> futs = new ArrayList<>();
 
         for (int i = 0; i < peers.size(); i++) {
             Peer peer = peers.get(i);
@@ -322,14 +323,14 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
             ClusterNode node = 
clusterService.topologyService().getByConsistentId(peer.consistentId());
 
             if (node != null) {
-                futs[i] = sendSubscribeMessage(node, 
factory.subscriptionLeaderChangeRequest()
+                futs.add(sendSubscribeMessage(node, 
factory.subscriptionLeaderChangeRequest()
                         .groupId(groupId())
                         .subscribe(false)
-                        .build());
+                        .build()));
             }
         }
 
-        return CompletableFuture.allOf(futs);
+        return CompletableFuture.allOf(futs.toArray(new 
CompletableFuture[futs.size()]));
     }
 
     @Override
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 0bd0cb34e7..5aaca91e2a 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -273,7 +273,8 @@ public class Replica {
     /**
      * Shutdowns the replica.
      */
-    public void shutdown() {
+    public CompletableFuture<Void> shutdown() {
         listener.onShutdown();
+        return raftClient.unsubscribeLeader();
     }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 17463105ee..61e86f7ee6 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -376,7 +376,7 @@ public class ReplicaManager implements IgniteComponent {
      * @return True if the replica is found and closed, false otherwise.
      * @throws NodeStoppingException If the node is stopping.
      */
-    public boolean stopReplica(ReplicationGroupId replicaGrpId) throws 
NodeStoppingException {
+    public CompletableFuture<Boolean> stopReplica(ReplicationGroupId 
replicaGrpId) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
@@ -394,7 +394,7 @@ public class ReplicaManager implements IgniteComponent {
      * @param replicaGrpId Replication group id.
      * @return True if the replica is found and closed, false otherwise.
      */
-    private boolean stopReplicaInternal(ReplicationGroupId replicaGrpId) {
+    private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId 
replicaGrpId) {
         CompletableFuture<Replica> removed = replicas.remove(replicaGrpId);
 
         if (removed != null) {
@@ -406,13 +406,24 @@ public class ReplicaManager implements IgniteComponent {
             }
 
             if (!removed.isCompletedExceptionally()) {
-                removed.join().shutdown();
+                return removed
+                        .join()
+                        .shutdown()
+                        .handle((notUsed, throwable) -> {
+                            if (throwable == null) {
+                                return true;
+                            } else {
+                                LOG.error("Failed to stop replica 
[replicaGrpId={}]", replicaGrpId, throwable);
+
+                                return false;
+                            }
+                        });
             }
 
-            return true;
+            return completedFuture(true);
         }
 
-        return false;
+        return completedFuture(false);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index d7204c3c3f..b37a6c681c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -483,6 +483,8 @@ public class Cluster {
                     );
                 });
 
+        knockedOutNodesIndices.add(nodeIndex);
+
         LOG.info("Knocked out node " + nodeIndex + " with an artificial 
network partition");
     }
 
@@ -509,6 +511,8 @@ public class Cluster {
                     }
                 });
 
+        knockedOutNodesIndices.remove(nodeIndex);
+
         LOG.info("Reanimated node " + nodeIndex + " by removing an artificial 
network partition");
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 1eb85a25c5..e1edf580af 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -763,6 +763,7 @@ public class ItRebalanceDistributedTest {
                     name
             );
 
+
             tableManager = new TableManager(
                     name,
                     registry,
@@ -770,7 +771,7 @@ public class ItRebalanceDistributedTest {
                     zonesCfg,
                     clusterService,
                     raftManager,
-                    Mockito.mock(ReplicaManager.class),
+                    replicaManager,
                     Mockito.mock(LockManager.class),
                     replicaSvc,
                     baselineMgr,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index f67cba5c13..5ae16bd940 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -21,39 +21,37 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 import java.nio.file.Path;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.Cluster;
 import org.apache.ignite.internal.IgniteIntegrationTest;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import org.apache.ignite.internal.schema.BinaryRowEx;
-import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
@@ -68,6 +66,8 @@ public class ItRebalanceTest extends IgniteIntegrationTest {
 
     private Cluster cluster;
 
+    private final HybridClock clock = new HybridClockImpl();
+
     @BeforeEach
     void createCluster(TestInfo testInfo) {
         cluster = new Cluster(testInfo, workDir);
@@ -84,122 +84,87 @@ public class ItRebalanceTest extends IgniteIntegrationTest 
{
      * @throws Exception If failed.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18692";)
     void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
         cluster.startAndInit(4);
 
         //Creates table with 1 partition and 3 replicas.
         createTestTable();
 
-        assertTrue(waitAssignments(List.of(
-                Set.of(0, 1, 2),
-                Set.of(0, 1, 2),
-                Set.of(0, 1, 2),
-                Set.of(0, 1, 2)
-        )));
-
         TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST");
 
+        waitForStableAssignmentsInMetastore(Set.of(
+                nodeName(0),
+                nodeName(1),
+                nodeName(2)
+        ), table.tableId());
+
         BinaryRowEx row = new 
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 
1).set("value", "value1"));
         BinaryRowEx key = new 
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1));
 
-        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(0).node()).get());
-        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(1).node()).get());
-        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(2).node()).get());
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(0).node()), willBe(nullValue()));
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(1).node()), willBe(nullValue()));
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(2).node()), willBe(nullValue()));
 
         table.internalTable().insert(row, null).get();
 
-        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(0).node()).get());
-        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(1).node()).get());
-        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(2).node()).get());
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(0).node()), willBe(notNullValue()));
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(1).node()), willBe(notNullValue()));
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(2).node()), willBe(notNullValue()));
 
-        try {
-            table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(3).node()).get();
+        assertThat(
+                table.internalTable().get(key, clock.now(), 
cluster.node(3).node()),
+                willThrow(ReplicationException.class, 10, TimeUnit.SECONDS)
+        );
 
-            fail();
-        } catch (Exception e) {
-            assertInstanceOf(ExecutionException.class, e);
+        cluster.stopNode(2);
 
-            assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
-        }
+        waitForStableAssignmentsInMetastore(Set.of(
+                nodeName(0),
+                nodeName(1),
+                nodeName(3)
+        ), table.tableId());
 
-        cluster.simulateNetworkPartitionOf(2);
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(0).node()), willBe(notNullValue()));
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(1).node()), willBe(notNullValue()));
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(3).node()), willBe(notNullValue()));
 
-        assertTrue(waitAssignments(List.of(
-                Set.of(0, 1, 3),
-                Set.of(0, 1, 3),
-                Set.of(0, 1, 2),
-                Set.of(0, 1, 3)
-        )));
+        cluster.startNode(2);
 
-        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(0).node()).get());
-        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(1).node()).get());
-        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(3).node()).get());
+        waitForStableAssignmentsInMetastore(Set.of(
+                nodeName(0),
+                nodeName(1),
+                nodeName(2)
+        ), table.tableId());
 
-        cluster.removeNetworkPartitionOf(2);
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(0).node()), willBe(notNullValue()));
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(1).node()), willBe(notNullValue()));
+        assertThat(table.internalTable().get(key, clock.now(), 
cluster.node(2).node()), willBe(notNullValue()));
 
-        assertTrue(waitAssignments(List.of(
-                Set.of(0, 1, 2),
-                Set.of(0, 1, 2),
-                Set.of(0, 1, 2),
-                Set.of(0, 1, 2)
-        )));
+        assertThat(
+                table.internalTable().get(key, clock.now(), 
cluster.node(3).node()),
+                willThrow(ReplicationException.class, 10, TimeUnit.SECONDS)
+        );
+    }
 
-        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(0).node()).get());
-        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(1).node()).get());
-        assertNotNull(table.internalTable().get(key, new 
HybridClockImpl().now(), cluster.node(2).node()).get());
+    private void waitForStableAssignmentsInMetastore(Set<String> 
expectedNodes, int table) throws InterruptedException {
+        Set<String>[] lastAssignmentsHolderForLog = new Set[1];
 
-        try {
-            table.internalTable().get(key, new HybridClockImpl().now(), 
cluster.node(3).node()).get();
+        assertTrue(waitForCondition(() -> {
+            Set<String> assignments =
+                    partitionAssignments(
+                            cluster.aliveNode().metaStorageManager(), table, 0
+                    ).join().stream()
+                            .map(Assignment::consistentId)
+                            .collect(Collectors.toSet());
 
-            fail();
-        } catch (Exception e) {
-            assertInstanceOf(ExecutionException.class, e);
+            lastAssignmentsHolderForLog[0] = assignments;
 
-            assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
-        }
+            return assignments.equals(expectedNodes);
+        }, 30000), "Expected nodes: " + expectedNodes + ", actual nodes: " + 
lastAssignmentsHolderForLog[0]);
     }
 
-    /**
-     * Wait assignments on nodes.
-     *
-     * @param nodes Expected assignments.
-     * @return {@code true} if the expected and actual assignments are the 
same.
-     * @throws InterruptedException If interrupted.
-     */
-    private boolean waitAssignments(List<Set<Integer>> nodes) throws 
InterruptedException {
-        return waitForCondition(() -> {
-            for (int i = 0; i < nodes.size(); i++) {
-                Set<Integer> expectedAssignments = nodes.get(i);
-
-                ExtendedTableConfiguration table =
-                        (ExtendedTableConfiguration) cluster.node(i)
-                                
.clusterConfiguration().getConfiguration(TablesConfiguration.KEY).tables().get("TEST");
-
-                Set<Assignment> assignments =
-                        partitionAssignments(
-                                cluster.node(i).metaStorageManager(), 
table.id().value(), 0).join();
-
-                Set<String> assignmentIds;
-
-                if (assignments != null) {
-                    assignmentIds = assignments
-                            .stream().map(assignment -> 
assignment.consistentId()).collect(Collectors.toSet());
-                } else {
-                    assignmentIds = Collections.emptySet();
-                }
-
-                LOG.info("Assignments for node " + i + ": " + assignmentIds);
-
-                if (!(expectedAssignments.size() == assignmentIds.size())
-                        || !expectedAssignments.stream().allMatch(node -> 
assignmentIds.contains(cluster.node(node).name()))) {
-                    return false;
-                }
-            }
-
-            return true;
-        },
-                5000);
+    private String nodeName(int nodeIndex) {
+        return cluster.node(nodeIndex).name();
     }
 
     private void createTestTable() throws InterruptedException {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 003c865133..b7bf7deae3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -156,6 +156,10 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     @Mock
     MetaStorageManager msm;
 
+    /** Replica manager. */
+    @Mock
+    ReplicaManager replicaManager;
+
     @Mock
     HybridClock clock;
 
@@ -555,6 +559,8 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
             return ret;
         });
 
+        
when(replicaManager.stopReplica(any())).thenReturn(completedFuture(true));
+
         return createTableManager();
     }
 
@@ -566,7 +572,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
                 dstZnsCfg,
                 cs,
                 rm,
-                mock(ReplicaManager.class),
+                replicaManager,
                 null,
                 null,
                 bm,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 3a70443951..987667023e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -641,7 +641,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
             ReplicaManager replicaMgr = replicaManagers.get(entry.getKey());
 
             for (ReplicationGroupId grp : replicaMgr.startedGroups()) {
-                replicaMgr.stopReplica(grp);
+                replicaMgr.stopReplica(grp).join();
             }
 
             for (RaftNodeId nodeId : rs.localNodes()) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index ccda9d0124..006744e698 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -196,7 +196,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                     try {
                         log.info("Replica msg " + 
message.getClass().getSimpleName());
 
-                        replicaManager.stopReplica(tablePartitionId);
+                        replicaManager.stopReplica(tablePartitionId).join();
                     } catch (NodeStoppingException e) {
                         throw new RuntimeException(e);
                     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index cbfef02800..12d6be365b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -27,6 +27,8 @@ import static 
java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignments;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static 
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -249,6 +251,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     /** Meta storage manager. */
     private final MetaStorageManager metaStorageMgr;
 
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
     /** Data storage manager. */
     private final DataStorageManager dataStorageMgr;
 
@@ -403,6 +408,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         this.dataStorageMgr = dataStorageMgr;
         this.storagePath = storagePath;
         this.metaStorageMgr = metaStorageMgr;
+        this.vaultManager = vaultManager;
         this.schemaManager = schemaManager;
         this.volatileLogStorageFactoryCreator = 
volatileLogStorageFactoryCreator;
         this.clock = clock;
@@ -565,11 +571,19 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             DistributionZoneView zone =
                     getZoneById(distributionZonesConfiguration, 
(ctx.newValue()).zoneId()).value();
 
-            List<Set<Assignment>> assignments = 
AffinityUtils.calculateAssignments(
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19425 use data nodes from 
DistributionZoneManager instead.
-                    
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
-                    zone.partitions(),
-                    zone.replicas());
+            List<Set<Assignment>> assignments;
+
+            // Check if the table already has assignments in the vault.
+            // So, it means, that it is a recovery process and we should use 
the vault assignments instead of calculation for the new ones.
+            if (partitionAssignments(vaultManager, ctx.newValue().id(), 0) != 
null) {
+                assignments = tableAssignments(vaultManager, 
ctx.newValue().id(), zone.partitions());
+            } else {
+                assignments = AffinityUtils.calculateAssignments(
+                        // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19425 use data nodes from 
DistributionZoneManager instead.
+                        
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
+                        zone.partitions(),
+                        zone.replicas());
+            }
 
             assert !assignments.isEmpty() : "Couldn't create the table with 
empty assignments.";
 
@@ -1055,7 +1069,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 stopping.add(() -> {
                     try {
-                        replicaMgr.stopReplica(replicationGroupId);
+                        replicaMgr.stopReplica(replicationGroupId).join();
                     } catch (Throwable t) {
                         handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
                     }
@@ -1277,9 +1291,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 raftMgr.stopRaftNodes(replicationGroupId);
 
-                replicaMgr.stopReplica(replicationGroupId);
-
-                removeStorageFromGcFutures[p] = 
mvGc.removeStorage(replicationGroupId);
+                removeStorageFromGcFutures[p] = replicaMgr
+                        .stopReplica(replicationGroupId)
+                        .thenCompose((notUsed) -> 
mvGc.removeStorage(replicationGroupId));
             }
 
             tablesByIdVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
@@ -2090,8 +2104,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         InternalTable internalTable = tbl.internalTable();
 
-        ((InternalTableImpl) internalTable).updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
-
         LOG.info("Received update on pending assignments. Check if new raft 
group should be started"
                         + " [key={}, partition={}, table={}, 
localMemberAddress={}]",
                 new String(pendingAssignmentsEntry.key(), 
StandardCharsets.UTF_8), partId, tbl.name(), localMember.address());
@@ -2099,6 +2111,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         CompletableFuture<Void> localServicesStartFuture;
 
         if (shouldStartLocalServices) {
+            ((InternalTableImpl) 
internalTable).updatePartitionTrackers(partId, safeTimeTracker, 
storageIndexTracker);
+
             localServicesStartFuture = getOrCreatePartitionStorages(tbl, 
partId)
                     .thenAcceptAsync(partitionStorages -> {
                         MvPartitionStorage mvPartitionStorage = 
partitionStorages.getMvPartitionStorage();
@@ -2412,12 +2426,19 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                     try {
                         raftMgr.stopRaftNodes(tablePartitionId);
+                    } catch (NodeStoppingException e) {
+                        // No-op
+                    }
 
-                        replicaMgr.stopReplica(tablePartitionId);
+                    CompletableFuture<Boolean> stopReplicaFut;
+                    try {
+                        stopReplicaFut = 
replicaMgr.stopReplica(tablePartitionId);
                     } catch (NodeStoppingException e) {
-                        // No-op.
+                        stopReplicaFut = completedFuture(true);
                     }
 
+                    CompletableFuture<Boolean> finalStopReplicaFut = 
stopReplicaFut;
+
                     return tablesById(evt.revision())
                             // TODO: IGNITE-18703 Destroy raft log and meta
                             .thenCombine(mvGc.removeStorage(tablePartitionId), 
(tables, unused) -> {
@@ -2426,6 +2447,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 closePartitionTrackers(internalTable, 
partitionId);
 
                                 return allOf(
+                                        finalStopReplicaFut,
                                         
internalTable.storage().destroyPartition(partitionId),
                                         runAsync(() -> 
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
                                 );
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 0cec45a2ae..1f4af40cf8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -246,7 +246,7 @@ public class TableManagerTest extends IgniteAbstractTest {
 
     /** Before all test scenarios. */
     @BeforeEach
-    void before() {
+    void before() throws NodeStoppingException {
         
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
 
         TopologyService topologyService = mock(TopologyService.class);
@@ -280,6 +280,8 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         when(distributionZoneManager.topologyVersionedDataNodes(anyInt(), 
anyLong())).thenReturn(completedFuture(emptySet()));
 
+        when(replicaMgr.stopReplica(any())).thenReturn(completedFuture(true));
+
         tblManagerFut = new CompletableFuture<>();
     }
 
@@ -387,6 +389,7 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         verify(mvTableStorage).destroy();
         verify(txStateTableStorage).destroy();
+        verify(replicaMgr, times(PARTITIONS)).stopReplica(any());
 
         assertNull(tableManager.table(scmTbl.name()));
 


Reply via email to