This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7bcea31c9e IGNITE-18692 ItRebalanceTest fix (#2157)
7bcea31c9e is described below
commit 7bcea31c9eb6350120584c1ca060131504927d04
Author: Kirill Gusakov <[email protected]>
AuthorDate: Tue Jun 13 15:18:57 2023 +0300
IGNITE-18692 ItRebalanceTest fix (#2157)
---
.../distributionzones/rebalance/RebalanceUtil.java | 42 ++++++
.../ItPlacementDriverReplicaSideTest.java | 2 +-
.../raft/client/TopologyAwareRaftGroupService.java | 9 +-
.../apache/ignite/internal/replicator/Replica.java | 3 +-
.../ignite/internal/replicator/ReplicaManager.java | 21 ++-
.../java/org/apache/ignite/internal/Cluster.java | 4 +
.../storage/ItRebalanceDistributedTest.java | 3 +-
.../ignite/internal/rebalance/ItRebalanceTest.java | 161 ++++++++-------------
.../sql/engine/exec/MockedStructuresTest.java | 8 +-
.../distributed/ItTxDistributedTestSingleNode.java | 2 +-
.../ignite/distributed/ReplicaUnavailableTest.java | 2 +-
.../internal/table/distributed/TableManager.java | 48 ++++--
.../table/distributed/TableManagerTest.java | 5 +-
13 files changed, 183 insertions(+), 127 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 1fa1235894..0b1bd58d5a 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -39,6 +39,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -46,6 +47,8 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.jetbrains.annotations.NotNull;
@@ -397,6 +400,22 @@ public class RebalanceUtil {
.thenApply(e -> (e.value() == null) ? null :
ByteUtils.fromBytes(e.value()));
}
+ /**
+ * Returns partition assignments from vault.
+ *
+ * @param vaultManager Vault manager.
+ * @param tableId Table id.
+ * @param partitionNumber Partition number.
+ * @return Returns partition assignments from vault or {@code null} if
assignments is absent.
+ */
+ public static Set<Assignment> partitionAssignments(
+ VaultManager vaultManager, int tableId, int partitionNumber) {
+ VaultEntry entry =
+ vaultManager.get(stablePartAssignmentsKey(new
TablePartitionId(tableId, partitionNumber))).join();
+
+ return (entry == null) ? null : ByteUtils.fromBytes(entry.value());
+ }
+
/**
* Returns table assignments for all table partitions from meta storage.
*
@@ -431,4 +450,27 @@ public class RebalanceUtil {
return Arrays.asList(result);
});
}
+
+ /**
+ * Returns table assignments for all table partitions from vault.
+ *
+ * @param vaultManager Vault manager.
+ * @param tableId Table id.
+ * @param numberOfPartitions Number of partitions.
+ * @return Future with table assignments as a value.
+ */
+ public static List<Set<Assignment>> tableAssignments(
+ VaultManager vaultManager,
+ int tableId,
+ int numberOfPartitions
+ ) {
+ return IntStream.range(0, numberOfPartitions)
+ .mapToObj(i ->
+ (Set<Assignment>) ByteUtils.fromBytes(
+ vaultManager.get(
+ stablePartAssignmentsKey(new
TablePartitionId(tableId, i))
+ ).join().value())
+ )
+ .collect(Collectors.toList());
+ }
}
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 1d79971912..03ffee72b6 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -394,7 +394,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
assertNotNull(raftManager);
assertNotNull(replicaManager);
- replicaManager.stopReplica(testGrpId);
+ replicaManager.stopReplica(testGrpId).join();
raftManager.stopRaftNodes(testGrpId);
}
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 2da893f7f4..6831025ca1 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.raft.client;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -314,7 +315,7 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
serverEventHandler.resetLeader();
var peers = peers();
- var futs = new CompletableFuture[peers.size()];
+ List<CompletableFuture<Boolean>> futs = new ArrayList<>();
for (int i = 0; i < peers.size(); i++) {
Peer peer = peers.get(i);
@@ -322,14 +323,14 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
ClusterNode node =
clusterService.topologyService().getByConsistentId(peer.consistentId());
if (node != null) {
- futs[i] = sendSubscribeMessage(node,
factory.subscriptionLeaderChangeRequest()
+ futs.add(sendSubscribeMessage(node,
factory.subscriptionLeaderChangeRequest()
.groupId(groupId())
.subscribe(false)
- .build());
+ .build()));
}
}
- return CompletableFuture.allOf(futs);
+ return CompletableFuture.allOf(futs.toArray(new
CompletableFuture[futs.size()]));
}
@Override
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 0bd0cb34e7..5aaca91e2a 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -273,7 +273,8 @@ public class Replica {
/**
* Shutdowns the replica.
*/
- public void shutdown() {
+ public CompletableFuture<Void> shutdown() {
listener.onShutdown();
+ return raftClient.unsubscribeLeader();
}
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 17463105ee..61e86f7ee6 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -376,7 +376,7 @@ public class ReplicaManager implements IgniteComponent {
* @return True if the replica is found and closed, false otherwise.
* @throws NodeStoppingException If the node is stopping.
*/
- public boolean stopReplica(ReplicationGroupId replicaGrpId) throws
NodeStoppingException {
+ public CompletableFuture<Boolean> stopReplica(ReplicationGroupId
replicaGrpId) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
@@ -394,7 +394,7 @@ public class ReplicaManager implements IgniteComponent {
* @param replicaGrpId Replication group id.
* @return True if the replica is found and closed, false otherwise.
*/
- private boolean stopReplicaInternal(ReplicationGroupId replicaGrpId) {
+ private CompletableFuture<Boolean> stopReplicaInternal(ReplicationGroupId
replicaGrpId) {
CompletableFuture<Replica> removed = replicas.remove(replicaGrpId);
if (removed != null) {
@@ -406,13 +406,24 @@ public class ReplicaManager implements IgniteComponent {
}
if (!removed.isCompletedExceptionally()) {
- removed.join().shutdown();
+ return removed
+ .join()
+ .shutdown()
+ .handle((notUsed, throwable) -> {
+ if (throwable == null) {
+ return true;
+ } else {
+ LOG.error("Failed to stop replica
[replicaGrpId={}]", replicaGrpId, throwable);
+
+ return false;
+ }
+ });
}
- return true;
+ return completedFuture(true);
}
- return false;
+ return completedFuture(false);
}
/** {@inheritDoc} */
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index d7204c3c3f..b37a6c681c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -483,6 +483,8 @@ public class Cluster {
);
});
+ knockedOutNodesIndices.add(nodeIndex);
+
LOG.info("Knocked out node " + nodeIndex + " with an artificial
network partition");
}
@@ -509,6 +511,8 @@ public class Cluster {
}
});
+ knockedOutNodesIndices.remove(nodeIndex);
+
LOG.info("Reanimated node " + nodeIndex + " by removing an artificial
network partition");
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 1eb85a25c5..e1edf580af 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -763,6 +763,7 @@ public class ItRebalanceDistributedTest {
name
);
+
tableManager = new TableManager(
name,
registry,
@@ -770,7 +771,7 @@ public class ItRebalanceDistributedTest {
zonesCfg,
clusterService,
raftManager,
- Mockito.mock(ReplicaManager.class),
+ replicaManager,
Mockito.mock(LockManager.class),
replicaSvc,
baselineMgr,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index f67cba5c13..5ae16bd940 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -21,39 +21,37 @@ import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import java.nio.file.Path;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.Cluster;
import org.apache.ignite.internal.IgniteIntegrationTest;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.schema.BinaryRowEx;
-import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -68,6 +66,8 @@ public class ItRebalanceTest extends IgniteIntegrationTest {
private Cluster cluster;
+ private final HybridClock clock = new HybridClockImpl();
+
@BeforeEach
void createCluster(TestInfo testInfo) {
cluster = new Cluster(testInfo, workDir);
@@ -84,122 +84,87 @@ public class ItRebalanceTest extends IgniteIntegrationTest
{
* @throws Exception If failed.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18692")
void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
cluster.startAndInit(4);
//Creates table with 1 partition and 3 replicas.
createTestTable();
- assertTrue(waitAssignments(List.of(
- Set.of(0, 1, 2),
- Set.of(0, 1, 2),
- Set.of(0, 1, 2),
- Set.of(0, 1, 2)
- )));
-
TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST");
+ waitForStableAssignmentsInMetastore(Set.of(
+ nodeName(0),
+ nodeName(1),
+ nodeName(2)
+ ), table.tableId());
+
BinaryRowEx row = new
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id",
1).set("value", "value1"));
BinaryRowEx key = new
TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1));
- assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(0).node()).get());
- assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(1).node()).get());
- assertNull(table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(2).node()).get());
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(0).node()), willBe(nullValue()));
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(1).node()), willBe(nullValue()));
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(2).node()), willBe(nullValue()));
table.internalTable().insert(row, null).get();
- assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(0).node()).get());
- assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(1).node()).get());
- assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(2).node()).get());
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(0).node()), willBe(notNullValue()));
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(1).node()), willBe(notNullValue()));
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(2).node()), willBe(notNullValue()));
- try {
- table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(3).node()).get();
+ assertThat(
+ table.internalTable().get(key, clock.now(),
cluster.node(3).node()),
+ willThrow(ReplicationException.class, 10, TimeUnit.SECONDS)
+ );
- fail();
- } catch (Exception e) {
- assertInstanceOf(ExecutionException.class, e);
+ cluster.stopNode(2);
- assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
- }
+ waitForStableAssignmentsInMetastore(Set.of(
+ nodeName(0),
+ nodeName(1),
+ nodeName(3)
+ ), table.tableId());
- cluster.simulateNetworkPartitionOf(2);
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(0).node()), willBe(notNullValue()));
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(1).node()), willBe(notNullValue()));
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(3).node()), willBe(notNullValue()));
- assertTrue(waitAssignments(List.of(
- Set.of(0, 1, 3),
- Set.of(0, 1, 3),
- Set.of(0, 1, 2),
- Set.of(0, 1, 3)
- )));
+ cluster.startNode(2);
- assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(0).node()).get());
- assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(1).node()).get());
- assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(3).node()).get());
+ waitForStableAssignmentsInMetastore(Set.of(
+ nodeName(0),
+ nodeName(1),
+ nodeName(2)
+ ), table.tableId());
- cluster.removeNetworkPartitionOf(2);
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(0).node()), willBe(notNullValue()));
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(1).node()), willBe(notNullValue()));
+ assertThat(table.internalTable().get(key, clock.now(),
cluster.node(2).node()), willBe(notNullValue()));
- assertTrue(waitAssignments(List.of(
- Set.of(0, 1, 2),
- Set.of(0, 1, 2),
- Set.of(0, 1, 2),
- Set.of(0, 1, 2)
- )));
+ assertThat(
+ table.internalTable().get(key, clock.now(),
cluster.node(3).node()),
+ willThrow(ReplicationException.class, 10, TimeUnit.SECONDS)
+ );
+ }
- assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(0).node()).get());
- assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(1).node()).get());
- assertNotNull(table.internalTable().get(key, new
HybridClockImpl().now(), cluster.node(2).node()).get());
+ private void waitForStableAssignmentsInMetastore(Set<String>
expectedNodes, int table) throws InterruptedException {
+ Set<String>[] lastAssignmentsHolderForLog = new Set[1];
- try {
- table.internalTable().get(key, new HybridClockImpl().now(),
cluster.node(3).node()).get();
+ assertTrue(waitForCondition(() -> {
+ Set<String> assignments =
+ partitionAssignments(
+ cluster.aliveNode().metaStorageManager(), table, 0
+ ).join().stream()
+ .map(Assignment::consistentId)
+ .collect(Collectors.toSet());
- fail();
- } catch (Exception e) {
- assertInstanceOf(ExecutionException.class, e);
+ lastAssignmentsHolderForLog[0] = assignments;
- assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
- }
+ return assignments.equals(expectedNodes);
+ }, 30000), "Expected nodes: " + expectedNodes + ", actual nodes: " +
lastAssignmentsHolderForLog[0]);
}
- /**
- * Wait assignments on nodes.
- *
- * @param nodes Expected assignments.
- * @return {@code true} if the expected and actual assignments are the
same.
- * @throws InterruptedException If interrupted.
- */
- private boolean waitAssignments(List<Set<Integer>> nodes) throws
InterruptedException {
- return waitForCondition(() -> {
- for (int i = 0; i < nodes.size(); i++) {
- Set<Integer> expectedAssignments = nodes.get(i);
-
- ExtendedTableConfiguration table =
- (ExtendedTableConfiguration) cluster.node(i)
-
.clusterConfiguration().getConfiguration(TablesConfiguration.KEY).tables().get("TEST");
-
- Set<Assignment> assignments =
- partitionAssignments(
- cluster.node(i).metaStorageManager(),
table.id().value(), 0).join();
-
- Set<String> assignmentIds;
-
- if (assignments != null) {
- assignmentIds = assignments
- .stream().map(assignment ->
assignment.consistentId()).collect(Collectors.toSet());
- } else {
- assignmentIds = Collections.emptySet();
- }
-
- LOG.info("Assignments for node " + i + ": " + assignmentIds);
-
- if (!(expectedAssignments.size() == assignmentIds.size())
- || !expectedAssignments.stream().allMatch(node ->
assignmentIds.contains(cluster.node(node).name()))) {
- return false;
- }
- }
-
- return true;
- },
- 5000);
+ private String nodeName(int nodeIndex) {
+ return cluster.node(nodeIndex).name();
}
private void createTestTable() throws InterruptedException {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 003c865133..b7bf7deae3 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -156,6 +156,10 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
@Mock
MetaStorageManager msm;
+ /** Replica manager. */
+ @Mock
+ ReplicaManager replicaManager;
+
@Mock
HybridClock clock;
@@ -555,6 +559,8 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
return ret;
});
+
when(replicaManager.stopReplica(any())).thenReturn(completedFuture(true));
+
return createTableManager();
}
@@ -566,7 +572,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
dstZnsCfg,
cs,
rm,
- mock(ReplicaManager.class),
+ replicaManager,
null,
null,
bm,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 3a70443951..987667023e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -641,7 +641,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
ReplicaManager replicaMgr = replicaManagers.get(entry.getKey());
for (ReplicationGroupId grp : replicaMgr.startedGroups()) {
- replicaMgr.stopReplica(grp);
+ replicaMgr.stopReplica(grp).join();
}
for (RaftNodeId nodeId : rs.localNodes()) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index ccda9d0124..006744e698 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -196,7 +196,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
try {
log.info("Replica msg " +
message.getClass().getSimpleName());
- replicaManager.stopReplica(tablePartitionId);
+ replicaManager.stopReplica(tablePartitionId).join();
} catch (NodeStoppingException e) {
throw new RuntimeException(e);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index cbfef02800..12d6be365b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -27,6 +27,8 @@ import static
java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignments;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -249,6 +251,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
/** Meta storage manager. */
private final MetaStorageManager metaStorageMgr;
+ /** Vault manager. */
+ private final VaultManager vaultManager;
+
/** Data storage manager. */
private final DataStorageManager dataStorageMgr;
@@ -403,6 +408,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
this.dataStorageMgr = dataStorageMgr;
this.storagePath = storagePath;
this.metaStorageMgr = metaStorageMgr;
+ this.vaultManager = vaultManager;
this.schemaManager = schemaManager;
this.volatileLogStorageFactoryCreator =
volatileLogStorageFactoryCreator;
this.clock = clock;
@@ -565,11 +571,19 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
DistributionZoneView zone =
getZoneById(distributionZonesConfiguration,
(ctx.newValue()).zoneId()).value();
- List<Set<Assignment>> assignments =
AffinityUtils.calculateAssignments(
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-19425 use data nodes from
DistributionZoneManager instead.
-
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
- zone.partitions(),
- zone.replicas());
+ List<Set<Assignment>> assignments;
+
+ // Check if the table already has assignments in the vault.
+ // So, it means, that it is a recovery process and we should use
the vault assignments instead of calculation for the new ones.
+ if (partitionAssignments(vaultManager, ctx.newValue().id(), 0) !=
null) {
+ assignments = tableAssignments(vaultManager,
ctx.newValue().id(), zone.partitions());
+ } else {
+ assignments = AffinityUtils.calculateAssignments(
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-19425 use data nodes from
DistributionZoneManager instead.
+
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
+ zone.partitions(),
+ zone.replicas());
+ }
assert !assignments.isEmpty() : "Couldn't create the table with
empty assignments.";
@@ -1055,7 +1069,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
stopping.add(() -> {
try {
- replicaMgr.stopReplica(replicationGroupId);
+ replicaMgr.stopReplica(replicationGroupId).join();
} catch (Throwable t) {
handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
}
@@ -1277,9 +1291,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
raftMgr.stopRaftNodes(replicationGroupId);
- replicaMgr.stopReplica(replicationGroupId);
-
- removeStorageFromGcFutures[p] =
mvGc.removeStorage(replicationGroupId);
+ removeStorageFromGcFutures[p] = replicaMgr
+ .stopReplica(replicationGroupId)
+ .thenCompose((notUsed) ->
mvGc.removeStorage(replicationGroupId));
}
tablesByIdVv.update(causalityToken, (previousVal, e) ->
inBusyLock(busyLock, () -> {
@@ -2090,8 +2104,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
InternalTable internalTable = tbl.internalTable();
- ((InternalTableImpl) internalTable).updatePartitionTrackers(partId,
safeTimeTracker, storageIndexTracker);
-
LOG.info("Received update on pending assignments. Check if new raft
group should be started"
+ " [key={}, partition={}, table={},
localMemberAddress={}]",
new String(pendingAssignmentsEntry.key(),
StandardCharsets.UTF_8), partId, tbl.name(), localMember.address());
@@ -2099,6 +2111,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
CompletableFuture<Void> localServicesStartFuture;
if (shouldStartLocalServices) {
+ ((InternalTableImpl)
internalTable).updatePartitionTrackers(partId, safeTimeTracker,
storageIndexTracker);
+
localServicesStartFuture = getOrCreatePartitionStorages(tbl,
partId)
.thenAcceptAsync(partitionStorages -> {
MvPartitionStorage mvPartitionStorage =
partitionStorages.getMvPartitionStorage();
@@ -2412,12 +2426,19 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
try {
raftMgr.stopRaftNodes(tablePartitionId);
+ } catch (NodeStoppingException e) {
+ // No-op
+ }
- replicaMgr.stopReplica(tablePartitionId);
+ CompletableFuture<Boolean> stopReplicaFut;
+ try {
+ stopReplicaFut =
replicaMgr.stopReplica(tablePartitionId);
} catch (NodeStoppingException e) {
- // No-op.
+ stopReplicaFut = completedFuture(true);
}
+ CompletableFuture<Boolean> finalStopReplicaFut =
stopReplicaFut;
+
return tablesById(evt.revision())
// TODO: IGNITE-18703 Destroy raft log and meta
.thenCombine(mvGc.removeStorage(tablePartitionId),
(tables, unused) -> {
@@ -2426,6 +2447,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
closePartitionTrackers(internalTable,
partitionId);
return allOf(
+ finalStopReplicaFut,
internalTable.storage().destroyPartition(partitionId),
runAsync(() ->
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 0cec45a2ae..1f4af40cf8 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -246,7 +246,7 @@ public class TableManagerTest extends IgniteAbstractTest {
/** Before all test scenarios. */
@BeforeEach
- void before() {
+ void before() throws NodeStoppingException {
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
TopologyService topologyService = mock(TopologyService.class);
@@ -280,6 +280,8 @@ public class TableManagerTest extends IgniteAbstractTest {
when(distributionZoneManager.topologyVersionedDataNodes(anyInt(),
anyLong())).thenReturn(completedFuture(emptySet()));
+ when(replicaMgr.stopReplica(any())).thenReturn(completedFuture(true));
+
tblManagerFut = new CompletableFuture<>();
}
@@ -387,6 +389,7 @@ public class TableManagerTest extends IgniteAbstractTest {
verify(mvTableStorage).destroy();
verify(txStateTableStorage).destroy();
+ verify(replicaMgr, times(PARTITIONS)).stopReplica(any());
assertNull(tableManager.table(scmTbl.name()));