This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9663f83788 IGNITE-18160 Delete physical data after a Raft Group is
stopped (#1615)
9663f83788 is described below
commit 9663f837880bbb6664c030e6b60a220509897196
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Feb 2 17:59:23 2023 +0300
IGNITE-18160 Delete physical data after a Raft Group is stopped (#1615)
---
modules/runner/build.gradle | 2 +
.../storage/ItRebalanceDistributedTest.java | 351 ++++++++++++++++++++-
.../internal/storage/impl/TestStorageEngine.java | 4 +-
.../internal/table/distributed/TableManager.java | 109 ++++---
4 files changed, 400 insertions(+), 66 deletions(-)
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 99eacd3467..a25bd533fd 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -122,6 +122,8 @@ dependencies {
integrationTestImplementation(testFixtures(project(':ignite-network')))
integrationTestImplementation(testFixtures(project(':ignite-vault')))
integrationTestImplementation(testFixtures(project(':ignite-table')))
+ integrationTestImplementation(testFixtures(project(':ignite-storage-api')))
+
integrationTestImplementation(testFixtures(project(':ignite-transactions')))
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.rocksdb.jni
integrationTestImplementation libs.disruptor
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 6392a3fea2..dab0fc34ae 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -19,21 +19,47 @@ package org.apache.ignite.internal.configuration.storage;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static
org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNumber;
+import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import
org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
@@ -51,9 +77,12 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import
org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
import org.apache.ignite.internal.raft.Loza;
@@ -68,6 +97,7 @@ import
org.apache.ignite.internal.rest.configuration.RestConfiguration;
import org.apache.ignite.internal.schema.SchemaManager;
import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfigurationSchema;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import
org.apache.ignite.internal.schema.configuration.defaultvalue.ConstantValueDefaultConfigurationSchema;
import
org.apache.ignite.internal.schema.configuration.defaultvalue.FunctionCallDefaultConfigurationSchema;
@@ -80,16 +110,22 @@ import
org.apache.ignite.internal.schema.testutils.definition.ColumnType;
import org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.impl.TestDataStorageModule;
+import org.apache.ignite.internal.storage.impl.schema.TestDataStorageChange;
+import
org.apache.ignite.internal.storage.impl.schema.TestDataStorageConfigurationSchema;
import
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryDataStorageModule;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageConfigurationSchema;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.storage.rocksdb.RocksDbDataStorageModule;
import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.LockManager;
@@ -97,6 +133,8 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.ReverseIterator;
import org.apache.ignite.internal.vault.VaultManager;
@@ -106,7 +144,9 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+import org.apache.ignite.table.Table;
import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -123,6 +163,8 @@ public class ItRebalanceDistributedTest {
/** Ignite logger. */
private static final IgniteLogger LOG =
Loggers.forClass(ItRebalanceDistributedTest.class);
+ private static final String TABLE_1_NAME = "TBL1";
+
public static final int BASE_PORT = 20_000;
public static final String HOST = "localhost";
@@ -133,6 +175,16 @@ public class ItRebalanceDistributedTest {
@InjectConfiguration
private static ClusterManagementConfiguration
clusterManagementConfiguration;
+ @Target(ElementType.METHOD)
+ @Retention(RetentionPolicy.RUNTIME)
+ private @interface UseTestTxStateStorage {
+ }
+
+ @Target(ElementType.METHOD)
+ @Retention(RetentionPolicy.RUNTIME)
+ private @interface UseRocksMetaStorage {
+ }
+
@WorkDirectory
private Path workDir;
@@ -164,7 +216,7 @@ public class ItRebalanceDistributedTest {
}
@AfterEach
- void after() throws Exception {
+ void after() {
for (Node node : nodes) {
node.stop();
}
@@ -391,6 +443,85 @@ public class ItRebalanceDistributedTest {
assertEquals(3, getPartitionClusterNodes(2, 0).size());
}
+ @Test
+ @UseTestTxStateStorage
+ void testDestroyPartitionStoragesOnEvictNode() {
+ createTableWithOnePartition(TABLE_1_NAME, 3, true);
+
+ Set<Assignment> assignmentsBeforeChangeReplicas =
getPartitionClusterNodes(0, 0);
+
+ nodes.forEach(node ->
prepareFinishHandleChangeStableAssignmentEventFuture(node, TABLE_1_NAME, 0));
+
+ changeTableReplicasForSinglePartition(TABLE_1_NAME, 2);
+
+ Set<Assignment> assignmentsAfterChangeReplicas =
getPartitionClusterNodes(0, 0);
+
+ Set<Assignment> evictedAssignments =
getEvictedAssignments(assignmentsBeforeChangeReplicas,
assignmentsAfterChangeReplicas);
+
+ assertThat(
+ String.format("before=%s, after=%s",
assignmentsBeforeChangeReplicas, assignmentsAfterChangeReplicas),
+ evictedAssignments,
+ hasSize(1)
+ );
+
+ assertThat(collectFinishHandleChangeStableAssignmentEventFuture(null,
TABLE_1_NAME, 0), willCompleteSuccessfully());
+
+ Node evictedNode =
findNodeByConsistentId(first(evictedAssignments).consistentId());
+
+ assertNotNull(evictedNode, evictedAssignments.toString());
+
+ checkInvokeDestroyedPartitionStorages(evictedNode, TABLE_1_NAME, 0);
+ }
+
+ @Test
+ @UseTestTxStateStorage
+ @UseRocksMetaStorage
+ void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo)
throws Exception {
+ createTableWithOnePartition(TABLE_1_NAME, 3, true);
+
+ Set<Assignment> assignmentsBeforeChangeReplicas =
getPartitionClusterNodes(0, 0);
+
+ nodes.forEach(node -> {
+ prepareFinishHandleChangeStableAssignmentEventFuture(node,
TABLE_1_NAME, 0);
+
+ throwExceptionOnInvokeDestroyPartitionStorages(node, TABLE_1_NAME,
0);
+ });
+
+ changeTableReplicasForSinglePartition(TABLE_1_NAME, 2);
+
+ Assignment evictedAssignment =
first(getEvictedAssignments(assignmentsBeforeChangeReplicas,
getPartitionClusterNodes(0, 0)));
+
+ Node evictedNode =
findNodeByConsistentId(evictedAssignment.consistentId());
+
+ // Let's make sure that we handled the events
(STABLE_ASSIGNMENTS_PREFIX) from the metastore correctly.
+ assertThat(
+ collectFinishHandleChangeStableAssignmentEventFuture(node ->
!node.equals(evictedNode), TABLE_1_NAME, 0),
+ willCompleteSuccessfully()
+ );
+
+ TablePartitionId tablePartitionId =
evictedNode.getTablePartitionId(TABLE_1_NAME, 0);
+
+
assertThat(evictedNode.finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId),
willFailFast(Exception.class));
+
+ // Restart evicted node.
+ int evictedNodeIndex =
findNodeIndexByConsistentId(evictedAssignment.consistentId());
+
+ evictedNode.stop();
+
+ Node newNode = new Node(testInfo, evictedNode.networkAddress);
+
+
newNode.finishHandleChangeStableAssignmentEventFutures.put(tablePartitionId,
new CompletableFuture<>());
+
+ newNode.start();
+
+ nodes.set(evictedNodeIndex, newNode);
+
+ // Let's make sure that we will destroy the partition again.
+
assertThat(newNode.finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId),
willSucceedIn(1, TimeUnit.MINUTES));
+
+ checkInvokeDestroyedPartitionStorages(newNode, TABLE_1_NAME, 0);
+ }
+
private void waitPartitionAssignmentsSyncedToExpected(int partNum, int
replicasNum) {
while (!IntStream.range(0, nodes.size()).allMatch(n ->
getPartitionClusterNodes(n, partNum).size() == replicasNum)) {
LockSupport.parkNanos(100_000_000);
@@ -401,6 +532,10 @@ public class ItRebalanceDistributedTest {
return nodes.stream().filter(n ->
n.name.equals(consistentId)).findFirst().orElseThrow();
}
+ private int findNodeIndexByConsistentId(String consistentId) {
+ return IntStream.range(0, nodes.size()).filter(i ->
nodes.get(i).name.equals(consistentId)).findFirst().orElseThrow();
+ }
+
private Set<Assignment> getPartitionClusterNodes(int nodeNum, int partNum)
{
var table = ((ExtendedTableConfiguration)
nodes.get(nodeNum).clusterCfgMgr.configurationRegistry()
.getConfiguration(TablesConfiguration.KEY).tables().get("TBL1"));
@@ -451,10 +586,16 @@ public class ItRebalanceDistributedTest {
private List<IgniteComponent> nodeComponents;
+ private final Map<TablePartitionId, CompletableFuture<Void>>
finishHandleChangeStableAssignmentEventFutures
+ = new ConcurrentHashMap<>();
+
+ private final NetworkAddress networkAddress;
+
/**
* Constructor that simply creates a subset of components of this node.
*/
Node(TestInfo testInfo, NetworkAddress addr) {
+ networkAddress = addr;
name = testNodeName(testInfo, addr.port());
@@ -509,31 +650,39 @@ public class ItRebalanceDistributedTest {
clusterManagementConfiguration
);
+ String nodeName = clusterService.localConfiguration().getName();
+
metaStorageManager = new MetaStorageManagerImpl(
vaultManager,
clusterService,
cmgManager,
raftManager,
- new
SimpleInMemoryKeyValueStorage(clusterService.localConfiguration().getName())
+
testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
+ ? new RocksDbKeyValueStorage(nodeName,
resolveDir(dir, "metaStorage"))
+ : new SimpleInMemoryKeyValueStorage(nodeName)
);
cfgStorage = new
DistributedConfigurationStorage(metaStorageManager, vaultManager);
clusterCfgMgr = new ConfigurationManager(
- List.of(RocksDbStorageEngineConfiguration.KEY,
+ List.of(
+ RocksDbStorageEngineConfiguration.KEY,
VolatilePageMemoryStorageEngineConfiguration.KEY,
- TablesConfiguration.KEY),
+ TablesConfiguration.KEY
+ ),
Set.of(),
cfgStorage,
List.of(ExtendedTableConfigurationSchema.class),
- List.of(UnknownDataStorageConfigurationSchema.class,
+ List.of(
+ UnknownDataStorageConfigurationSchema.class,
VolatilePageMemoryDataStorageConfigurationSchema.class,
UnsafeMemoryAllocatorConfigurationSchema.class,
RocksDbDataStorageConfigurationSchema.class,
HashIndexConfigurationSchema.class,
ConstantValueDefaultConfigurationSchema.class,
FunctionCallDefaultConfigurationSchema.class,
- NullValueDefaultConfigurationSchema.class
+ NullValueDefaultConfigurationSchema.class,
+ TestDataStorageConfigurationSchema.class
)
);
@@ -545,7 +694,10 @@ public class ItRebalanceDistributedTest {
TablesConfiguration tablesCfg =
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
DataStorageModules dataStorageModules = new
DataStorageModules(List.of(
- new RocksDbDataStorageModule(), new
VolatilePageMemoryDataStorageModule()));
+ new RocksDbDataStorageModule(),
+ new VolatilePageMemoryDataStorageModule(),
+ new TestDataStorageModule()
+ ));
Path storagePath = dir.resolve("storage");
@@ -583,7 +735,33 @@ public class ItRebalanceDistributedTest {
view -> new LocalLogStorageFactory(),
new HybridClockImpl(),
new
OutgoingSnapshotsManager(clusterService.messagingService())
- );
+ ) {
+ @Override
+ protected TxStateTableStorage
createTxStateTableStorage(TableConfiguration tableCfg) {
+ return
testInfo.getTestMethod().get().isAnnotationPresent(UseTestTxStateStorage.class)
+ ? spy(new TestTxStateTableStorage())
+ : super.createTxStateTableStorage(tableCfg);
+ }
+
+ @Override
+ protected void handleChangeStableAssignmentEvent(WatchEvent
evt) {
+ TablePartitionId tablePartitionId =
getTablePartitionId(evt);
+
+ try {
+ super.handleChangeStableAssignmentEvent(evt);
+
+ if (tablePartitionId != null) {
+
finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId).complete(null);
+ }
+ } catch (Throwable t) {
+ if (tablePartitionId != null) {
+
finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId).completeExceptionally(t);
+ }
+
+ throw t;
+ }
+ }
+ };
}
/**
@@ -608,10 +786,13 @@ public class ItRebalanceDistributedTest {
nodeComponents.forEach(IgniteComponent::start);
- CompletableFuture.allOf(
-
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
-
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners()
- ).get();
+ assertThat(
+ CompletableFuture.allOf(
+
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners()
+ ),
+ willSucceedIn(1, TimeUnit.MINUTES)
+ );
// deploy watches to propagate data from the metastore into the
vault
metaStorageManager.deployWatches();
@@ -620,7 +801,7 @@ public class ItRebalanceDistributedTest {
/**
* Stops the created components.
*/
- void stop() throws Exception {
+ void stop() {
new ReverseIterator<>(nodeComponents).forEachRemaining(component
-> {
try {
component.beforeNodeStop();
@@ -642,20 +823,156 @@ public class ItRebalanceDistributedTest {
NetworkAddress address() {
return clusterService.topologyService().localMember().address();
}
+
+ @Nullable TablePartitionId getTablePartitionId(WatchEvent event) {
+ assertTrue(event.single(), event.toString());
+
+ Entry stableAssignmentsWatchEvent = event.entryEvent().newEntry();
+
+ if (stableAssignmentsWatchEvent.value() == null) {
+ return null;
+ }
+
+ int partitionId =
extractPartitionNumber(stableAssignmentsWatchEvent.key());
+ UUID tableId = extractTableId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX);
+
+ return new TablePartitionId(tableId, partitionId);
+ }
+
+ TablePartitionId getTablePartitionId(String tableName, int
partitionId) {
+ InternalTable internalTable = getInternalTable(this, tableName);
+
+ return new TablePartitionId(internalTable.tableId(), partitionId);
+ }
}
/**
* Starts the Vault component.
*/
private static VaultManager createVault(Path workDir) {
- Path vaultPath = workDir.resolve(Paths.get("vault"));
+ return new VaultManager(new PersistentVaultService(resolveDir(workDir,
"vault")));
+ }
+
+ private static Path resolveDir(Path workDir, String dirName) {
+ Path newDirPath = workDir.resolve(dirName);
try {
- Files.createDirectories(vaultPath);
+ return Files.createDirectories(newDirPath);
} catch (IOException e) {
throw new IgniteInternalException(e);
}
+ }
+
+ private static TableDefinition createTableDefinition(String tableName) {
+ return SchemaBuilders.tableBuilder("PUBLIC", tableName).columns(
+ SchemaBuilders.column("key", ColumnType.INT64).build(),
+ SchemaBuilders.column("val",
ColumnType.INT32).asNullable(true).build()
+ ).withPrimaryKey("key").build();
+ }
+
+ private void createTableWithOnePartition(String tableName, int replicas,
boolean testDataStorage) {
+ assertThat(
+ nodes.get(0).tableManager.createTableAsync(
+ tableName,
+ tableChange -> {
+
SchemaConfigurationConverter.convert(createTableDefinition(tableName),
tableChange)
+ .changeReplicas(replicas)
+ .changePartitions(1);
+
+ if (testDataStorage) {
+
tableChange.changeDataStorage(dataStorageChange ->
dataStorageChange.convert(TestDataStorageChange.class));
+ }
+ }
+ ),
+ willCompleteSuccessfully()
+ );
+
+ assertEquals(replicas, getPartitionClusterNodes(0, 0).size());
+ assertEquals(replicas, getPartitionClusterNodes(1, 0).size());
+ assertEquals(replicas, getPartitionClusterNodes(2, 0).size());
+ }
+
+ private void changeTableReplicasForSinglePartition(String tableName, int
replicas) {
+ assertThat(
+ nodes.get(0).tableManager.alterTableAsync(tableName,
tableChange -> {
+ tableChange.changeReplicas(replicas);
+
+ return true;
+ }),
+ willCompleteSuccessfully()
+ );
+
+ waitPartitionAssignmentsSyncedToExpected(0, replicas);
+
+ assertEquals(replicas, getPartitionClusterNodes(0, 0).size());
+ assertEquals(replicas, getPartitionClusterNodes(1, 0).size());
+ assertEquals(replicas, getPartitionClusterNodes(2, 0).size());
+ }
+
+ private static Set<Assignment> getEvictedAssignments(Set<Assignment>
beforeChange, Set<Assignment> afterChange) {
+ Set<Assignment> result = new HashSet<>(beforeChange);
+
+ result.removeAll(afterChange);
+
+ return result;
+ }
+
+ private static @Nullable InternalTable getInternalTable(Node node, String
tableName) {
+ Table table = node.tableManager.table(tableName);
+
+ assertNotNull(table, tableName);
+
+ return ((TableImpl) table).internalTable();
+ }
+
+ private static void checkInvokeDestroyedPartitionStorages(Node node,
String tableName, int partitionId) {
+ InternalTable internalTable = getInternalTable(node, tableName);
+
+ verify(internalTable.storage(),
atLeast(1)).destroyPartition(partitionId);
+ verify(internalTable.txStateStorage(),
atLeast(1)).destroyTxStateStorage(partitionId);
+ }
+
+ private static void throwExceptionOnInvokeDestroyPartitionStorages(Node
node, String tableName, int partitionId) {
+ InternalTable internalTable = getInternalTable(node, tableName);
+
+ doAnswer(answer -> CompletableFuture.failedFuture(new
StorageException("From test")))
+ .when(internalTable.storage())
+ .destroyPartition(partitionId);
+
+ doAnswer(answer -> CompletableFuture.failedFuture(new
IgniteInternalException("From test")))
+ .when(internalTable.txStateStorage())
+ .destroyTxStateStorage(partitionId);
+ }
+
+ private void prepareFinishHandleChangeStableAssignmentEventFuture(Node
node, String tableName, int partitionId) {
+ TablePartitionId tablePartitionId = new
TablePartitionId(getInternalTable(node, tableName).tableId(), partitionId);
+
+
node.finishHandleChangeStableAssignmentEventFutures.put(tablePartitionId, new
CompletableFuture<>());
+ }
+
+ private CompletableFuture<?>
collectFinishHandleChangeStableAssignmentEventFuture(
+ @Nullable Predicate<Node> nodeFilter,
+ String tableName,
+ int partitionId
+ ) {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ for (Node node : nodes) {
+ if (nodeFilter != null && !nodeFilter.test(node)) {
+ continue;
+ }
+
+ TablePartitionId tablePartitionId = new
TablePartitionId(getInternalTable(node, tableName).tableId(), partitionId);
+
+ CompletableFuture<Void> future =
node.finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId);
+
+ assertNotNull(future, String.format("node=%s, table=%s,
partitionId=%s", node.name, tableName, partitionId));
+
+ futures.add(future);
+ }
+
+ assertThat(String.format("tableName=%s, partitionId=%s", tableName,
partitionId), futures, not(empty()));
- return new VaultManager(new PersistentVaultService(vaultPath));
+ return
CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new));
}
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
index 268ff8d496..8ec6af73d1 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.storage.impl;
+import static org.mockito.Mockito.spy;
+
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
@@ -55,6 +57,6 @@ public class TestStorageEngine implements StorageEngine {
assert dataStorageName.equals(ENGINE_NAME) : dataStorageName;
- return new TestMvTableStorage(tableCfg, tablesCfg);
+ return spy(new TestMvTableStorage(tableCfg, tablesCfg));
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index c6f3df376c..e3cd05a9d2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1220,6 +1220,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
assert table != null : IgniteStringFormatter.format("There is no
table with the name specified [name={}, id={}]",
name, tblId);
+ // TODO: IGNITE-18703 Destroy raft log and meta
+
CompletableFuture<Void> destroyTableStoragesFuture = allOf(
table.internalTable().storage().destroy(),
runAsync(() ->
table.internalTable().txStateStorage().destroy(), ioExecutor)
@@ -1957,54 +1959,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
new WatchListener() {
@Override
public void onUpdate(WatchEvent evt) {
- inBusyLock(busyLock, () -> {
- assert evt.single() : evt;
-
- Entry stableAssignmentsWatchEvent =
evt.entryEvent().newEntry();
-
- if (stableAssignmentsWatchEvent.value() == null) {
- return;
- }
-
- int partitionId =
extractPartitionNumber(stableAssignmentsWatchEvent.key());
- UUID tableId =
extractTableId(stableAssignmentsWatchEvent.key(), STABLE_ASSIGNMENTS_PREFIX);
-
- TablePartitionId tablePartitionId = new
TablePartitionId(tableId, partitionId);
-
- Set<Assignment> stableAssignments =
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
-
- byte[] pendingAssignmentsFromMetaStorage =
metaStorageMgr.get(
- pendingPartAssignmentsKey(tablePartitionId),
- stableAssignmentsWatchEvent.revision()
- ).join().value();
-
- Set<Assignment> pendingAssignments =
pendingAssignmentsFromMetaStorage == null
- ? Set.of()
- :
ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage);
-
- String localMemberName =
clusterService.topologyService().localMember().name();
-
- boolean shouldStopLocalServices =
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
- .noneMatch(assignment ->
assignment.consistentId().equals(localMemberName));
-
- if (shouldStopLocalServices) {
- try {
- raftMgr.stopRaftNodes(tablePartitionId);
-
- replicaMgr.stopReplica(tablePartitionId);
- } catch (NodeStoppingException e) {
- // no-op
- }
-
- InternalTable internalTable =
tablesByIdVv.latest().get(tableId).internalTable();
-
- // Should be done fairly quickly.
- allOf(
-
internalTable.storage().destroyPartition(partitionId),
- runAsync(() ->
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
- ).join();
- }
- });
+ handleChangeStableAssignmentEvent(evt);
}
@Override
@@ -2102,4 +2057,62 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}, ioExecutor)
.thenCompose(Function.identity());
}
+
+ /**
+ * Handles the {@link RebalanceUtil#STABLE_ASSIGNMENTS_PREFIX} update
event.
+ *
+ * @param evt Event.
+ */
+ protected void handleChangeStableAssignmentEvent(WatchEvent evt) {
+ inBusyLock(busyLock, () -> {
+ assert evt.single() : evt;
+
+ Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
+
+ if (stableAssignmentsWatchEvent.value() == null) {
+ return;
+ }
+
+ int partitionId =
extractPartitionNumber(stableAssignmentsWatchEvent.key());
+ UUID tableId = extractTableId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX);
+
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partitionId);
+
+ Set<Assignment> stableAssignments =
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
+
+ byte[] pendingAssignmentsFromMetaStorage = metaStorageMgr.get(
+ pendingPartAssignmentsKey(tablePartitionId),
+ stableAssignmentsWatchEvent.revision()
+ ).join().value();
+
+ Set<Assignment> pendingAssignments =
pendingAssignmentsFromMetaStorage == null
+ ? Set.of()
+ : ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage);
+
+ String localMemberName =
clusterService.topologyService().localMember().name();
+
+ boolean shouldStopLocalServices =
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
+ .noneMatch(assignment ->
assignment.consistentId().equals(localMemberName));
+
+ if (shouldStopLocalServices) {
+ try {
+ raftMgr.stopRaftNodes(tablePartitionId);
+
+ replicaMgr.stopReplica(tablePartitionId);
+ } catch (NodeStoppingException e) {
+ // no-op
+ }
+
+ InternalTable internalTable =
tablesByIdVv.latest().get(tableId).internalTable();
+
+ // TODO: IGNITE-18703 Destroy raft log and meta
+
+ // Should be done fairly quickly.
+ allOf(
+ internalTable.storage().destroyPartition(partitionId),
+ runAsync(() ->
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
+ ).join();
+ }
+ });
+ }
}