This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9663f83788 IGNITE-18160 Delete physical data after a Raft Group is 
stopped (#1615)
9663f83788 is described below

commit 9663f837880bbb6664c030e6b60a220509897196
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Feb 2 17:59:23 2023 +0300

    IGNITE-18160 Delete physical data after a Raft Group is stopped (#1615)
---
 modules/runner/build.gradle                        |   2 +
 .../storage/ItRebalanceDistributedTest.java        | 351 ++++++++++++++++++++-
 .../internal/storage/impl/TestStorageEngine.java   |   4 +-
 .../internal/table/distributed/TableManager.java   | 109 ++++---
 4 files changed, 400 insertions(+), 66 deletions(-)

diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 99eacd3467..a25bd533fd 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -122,6 +122,8 @@ dependencies {
     integrationTestImplementation(testFixtures(project(':ignite-network')))
     integrationTestImplementation(testFixtures(project(':ignite-vault')))
     integrationTestImplementation(testFixtures(project(':ignite-table')))
+    integrationTestImplementation(testFixtures(project(':ignite-storage-api')))
+    
integrationTestImplementation(testFixtures(project(':ignite-transactions')))
     integrationTestImplementation libs.jetbrains.annotations
     integrationTestImplementation libs.rocksdb.jni
     integrationTestImplementation libs.disruptor
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 6392a3fea2..dab0fc34ae 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -19,21 +19,47 @@ package org.apache.ignite.internal.configuration.storage;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
+import static 
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static 
org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNumber;
+import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import 
org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
@@ -51,9 +77,12 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import 
org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
 import org.apache.ignite.internal.raft.Loza;
@@ -68,6 +97,7 @@ import 
org.apache.ignite.internal.rest.configuration.RestConfiguration;
 import org.apache.ignite.internal.schema.SchemaManager;
 import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfigurationSchema;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.defaultvalue.ConstantValueDefaultConfigurationSchema;
 import 
org.apache.ignite.internal.schema.configuration.defaultvalue.FunctionCallDefaultConfigurationSchema;
@@ -80,16 +110,22 @@ import 
org.apache.ignite.internal.schema.testutils.definition.ColumnType;
 import org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.impl.TestDataStorageModule;
+import org.apache.ignite.internal.storage.impl.schema.TestDataStorageChange;
+import 
org.apache.ignite.internal.storage.impl.schema.TestDataStorageConfigurationSchema;
 import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryDataStorageModule;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageConfigurationSchema;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbDataStorageModule;
 import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
 import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.LockManager;
@@ -97,6 +133,8 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.ReverseIterator;
 import org.apache.ignite.internal.vault.VaultManager;
@@ -106,7 +144,9 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
+import org.apache.ignite.table.Table;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -123,6 +163,8 @@ public class ItRebalanceDistributedTest {
     /** Ignite logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(ItRebalanceDistributedTest.class);
 
+    private static final String TABLE_1_NAME = "TBL1";
+
     public static final int BASE_PORT = 20_000;
 
     public static final String HOST = "localhost";
@@ -133,6 +175,16 @@ public class ItRebalanceDistributedTest {
     @InjectConfiguration
     private static ClusterManagementConfiguration 
clusterManagementConfiguration;
 
+    @Target(ElementType.METHOD)
+    @Retention(RetentionPolicy.RUNTIME)
+    private @interface UseTestTxStateStorage {
+    }
+
+    @Target(ElementType.METHOD)
+    @Retention(RetentionPolicy.RUNTIME)
+    private @interface UseRocksMetaStorage {
+    }
+
     @WorkDirectory
     private Path workDir;
 
@@ -164,7 +216,7 @@ public class ItRebalanceDistributedTest {
     }
 
     @AfterEach
-    void after() throws Exception {
+    void after() {
         for (Node node : nodes) {
             node.stop();
         }
@@ -391,6 +443,85 @@ public class ItRebalanceDistributedTest {
         assertEquals(3, getPartitionClusterNodes(2, 0).size());
     }
 
+    @Test
+    @UseTestTxStateStorage
+    void testDestroyPartitionStoragesOnEvictNode() {
+        createTableWithOnePartition(TABLE_1_NAME, 3, true);
+
+        Set<Assignment> assignmentsBeforeChangeReplicas = 
getPartitionClusterNodes(0, 0);
+
+        nodes.forEach(node -> 
prepareFinishHandleChangeStableAssignmentEventFuture(node, TABLE_1_NAME, 0));
+
+        changeTableReplicasForSinglePartition(TABLE_1_NAME, 2);
+
+        Set<Assignment> assignmentsAfterChangeReplicas = 
getPartitionClusterNodes(0, 0);
+
+        Set<Assignment> evictedAssignments = 
getEvictedAssignments(assignmentsBeforeChangeReplicas, 
assignmentsAfterChangeReplicas);
+
+        assertThat(
+                String.format("before=%s, after=%s", 
assignmentsBeforeChangeReplicas, assignmentsAfterChangeReplicas),
+                evictedAssignments,
+                hasSize(1)
+        );
+
+        assertThat(collectFinishHandleChangeStableAssignmentEventFuture(null, 
TABLE_1_NAME, 0), willCompleteSuccessfully());
+
+        Node evictedNode = 
findNodeByConsistentId(first(evictedAssignments).consistentId());
+
+        assertNotNull(evictedNode, evictedAssignments.toString());
+
+        checkInvokeDestroyedPartitionStorages(evictedNode, TABLE_1_NAME, 0);
+    }
+
+    @Test
+    @UseTestTxStateStorage
+    @UseRocksMetaStorage
+    void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo) 
throws Exception {
+        createTableWithOnePartition(TABLE_1_NAME, 3, true);
+
+        Set<Assignment> assignmentsBeforeChangeReplicas = 
getPartitionClusterNodes(0, 0);
+
+        nodes.forEach(node -> {
+            prepareFinishHandleChangeStableAssignmentEventFuture(node, 
TABLE_1_NAME, 0);
+
+            throwExceptionOnInvokeDestroyPartitionStorages(node, TABLE_1_NAME, 
0);
+        });
+
+        changeTableReplicasForSinglePartition(TABLE_1_NAME, 2);
+
+        Assignment evictedAssignment = 
first(getEvictedAssignments(assignmentsBeforeChangeReplicas, 
getPartitionClusterNodes(0, 0)));
+
+        Node evictedNode = 
findNodeByConsistentId(evictedAssignment.consistentId());
+
+        // Let's make sure that we handled the events 
(STABLE_ASSIGNMENTS_PREFIX) from the metastore correctly.
+        assertThat(
+                collectFinishHandleChangeStableAssignmentEventFuture(node -> 
!node.equals(evictedNode), TABLE_1_NAME, 0),
+                willCompleteSuccessfully()
+        );
+
+        TablePartitionId tablePartitionId = 
evictedNode.getTablePartitionId(TABLE_1_NAME, 0);
+
+        
assertThat(evictedNode.finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId),
 willFailFast(Exception.class));
+
+        // Restart evicted node.
+        int evictedNodeIndex = 
findNodeIndexByConsistentId(evictedAssignment.consistentId());
+
+        evictedNode.stop();
+
+        Node newNode = new Node(testInfo, evictedNode.networkAddress);
+
+        
newNode.finishHandleChangeStableAssignmentEventFutures.put(tablePartitionId, 
new CompletableFuture<>());
+
+        newNode.start();
+
+        nodes.set(evictedNodeIndex, newNode);
+
+        // Let's make sure that we will destroy the partition again.
+        
assertThat(newNode.finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId),
 willSucceedIn(1, TimeUnit.MINUTES));
+
+        checkInvokeDestroyedPartitionStorages(newNode, TABLE_1_NAME, 0);
+    }
+
     private void waitPartitionAssignmentsSyncedToExpected(int partNum, int 
replicasNum) {
         while (!IntStream.range(0, nodes.size()).allMatch(n -> 
getPartitionClusterNodes(n, partNum).size() == replicasNum)) {
             LockSupport.parkNanos(100_000_000);
@@ -401,6 +532,10 @@ public class ItRebalanceDistributedTest {
         return nodes.stream().filter(n -> 
n.name.equals(consistentId)).findFirst().orElseThrow();
     }
 
+    private int findNodeIndexByConsistentId(String consistentId) {
+        return IntStream.range(0, nodes.size()).filter(i -> 
nodes.get(i).name.equals(consistentId)).findFirst().orElseThrow();
+    }
+
     private Set<Assignment> getPartitionClusterNodes(int nodeNum, int partNum) 
{
         var table = ((ExtendedTableConfiguration) 
nodes.get(nodeNum).clusterCfgMgr.configurationRegistry()
                 
.getConfiguration(TablesConfiguration.KEY).tables().get("TBL1"));
@@ -451,10 +586,16 @@ public class ItRebalanceDistributedTest {
 
         private List<IgniteComponent> nodeComponents;
 
+        private final Map<TablePartitionId, CompletableFuture<Void>> 
finishHandleChangeStableAssignmentEventFutures
+                = new ConcurrentHashMap<>();
+
+        private final NetworkAddress networkAddress;
+
         /**
          * Constructor that simply creates a subset of components of this node.
          */
         Node(TestInfo testInfo, NetworkAddress addr) {
+            networkAddress = addr;
 
             name = testNodeName(testInfo, addr.port());
 
@@ -509,31 +650,39 @@ public class ItRebalanceDistributedTest {
                     clusterManagementConfiguration
             );
 
+            String nodeName = clusterService.localConfiguration().getName();
+
             metaStorageManager = new MetaStorageManagerImpl(
                     vaultManager,
                     clusterService,
                     cmgManager,
                     raftManager,
-                    new 
SimpleInMemoryKeyValueStorage(clusterService.localConfiguration().getName())
+                    
testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
+                            ? new RocksDbKeyValueStorage(nodeName, 
resolveDir(dir, "metaStorage"))
+                            : new SimpleInMemoryKeyValueStorage(nodeName)
             );
 
             cfgStorage = new 
DistributedConfigurationStorage(metaStorageManager, vaultManager);
 
             clusterCfgMgr = new ConfigurationManager(
-                    List.of(RocksDbStorageEngineConfiguration.KEY,
+                    List.of(
+                            RocksDbStorageEngineConfiguration.KEY,
                             VolatilePageMemoryStorageEngineConfiguration.KEY,
-                            TablesConfiguration.KEY),
+                            TablesConfiguration.KEY
+                    ),
                     Set.of(),
                     cfgStorage,
                     List.of(ExtendedTableConfigurationSchema.class),
-                    List.of(UnknownDataStorageConfigurationSchema.class,
+                    List.of(
+                            UnknownDataStorageConfigurationSchema.class,
                             
VolatilePageMemoryDataStorageConfigurationSchema.class,
                             UnsafeMemoryAllocatorConfigurationSchema.class,
                             RocksDbDataStorageConfigurationSchema.class,
                             HashIndexConfigurationSchema.class,
                             ConstantValueDefaultConfigurationSchema.class,
                             FunctionCallDefaultConfigurationSchema.class,
-                            NullValueDefaultConfigurationSchema.class
+                            NullValueDefaultConfigurationSchema.class,
+                            TestDataStorageConfigurationSchema.class
                     )
             );
 
@@ -545,7 +694,10 @@ public class ItRebalanceDistributedTest {
             TablesConfiguration tablesCfg = 
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
 
             DataStorageModules dataStorageModules = new 
DataStorageModules(List.of(
-                    new RocksDbDataStorageModule(), new 
VolatilePageMemoryDataStorageModule()));
+                    new RocksDbDataStorageModule(),
+                    new VolatilePageMemoryDataStorageModule(),
+                    new TestDataStorageModule()
+            ));
 
             Path storagePath = dir.resolve("storage");
 
@@ -583,7 +735,33 @@ public class ItRebalanceDistributedTest {
                     view -> new LocalLogStorageFactory(),
                     new HybridClockImpl(),
                     new 
OutgoingSnapshotsManager(clusterService.messagingService())
-            );
+            ) {
+                @Override
+                protected TxStateTableStorage 
createTxStateTableStorage(TableConfiguration tableCfg) {
+                    return 
testInfo.getTestMethod().get().isAnnotationPresent(UseTestTxStateStorage.class)
+                            ? spy(new TestTxStateTableStorage())
+                            : super.createTxStateTableStorage(tableCfg);
+                }
+
+                @Override
+                protected void handleChangeStableAssignmentEvent(WatchEvent 
evt) {
+                    TablePartitionId tablePartitionId = 
getTablePartitionId(evt);
+
+                    try {
+                        super.handleChangeStableAssignmentEvent(evt);
+
+                        if (tablePartitionId != null) {
+                            
finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId).complete(null);
+                        }
+                    } catch (Throwable t) {
+                        if (tablePartitionId != null) {
+                            
finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId).completeExceptionally(t);
+                        }
+
+                        throw t;
+                    }
+                }
+            };
         }
 
         /**
@@ -608,10 +786,13 @@ public class ItRebalanceDistributedTest {
 
             nodeComponents.forEach(IgniteComponent::start);
 
-            CompletableFuture.allOf(
-                    
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
-                    
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners()
-            ).get();
+            assertThat(
+                    CompletableFuture.allOf(
+                            
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+                            
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners()
+                    ),
+                    willSucceedIn(1, TimeUnit.MINUTES)
+            );
 
             // deploy watches to propagate data from the metastore into the 
vault
             metaStorageManager.deployWatches();
@@ -620,7 +801,7 @@ public class ItRebalanceDistributedTest {
         /**
          * Stops the created components.
          */
-        void stop() throws Exception {
+        void stop() {
             new ReverseIterator<>(nodeComponents).forEachRemaining(component 
-> {
                 try {
                     component.beforeNodeStop();
@@ -642,20 +823,156 @@ public class ItRebalanceDistributedTest {
         NetworkAddress address() {
             return clusterService.topologyService().localMember().address();
         }
+
+        @Nullable TablePartitionId getTablePartitionId(WatchEvent event) {
+            assertTrue(event.single(), event.toString());
+
+            Entry stableAssignmentsWatchEvent = event.entryEvent().newEntry();
+
+            if (stableAssignmentsWatchEvent.value() == null) {
+                return null;
+            }
+
+            int partitionId = 
extractPartitionNumber(stableAssignmentsWatchEvent.key());
+            UUID tableId = extractTableId(stableAssignmentsWatchEvent.key(), 
STABLE_ASSIGNMENTS_PREFIX);
+
+            return new TablePartitionId(tableId, partitionId);
+        }
+
+        TablePartitionId getTablePartitionId(String tableName, int 
partitionId) {
+            InternalTable internalTable = getInternalTable(this, tableName);
+
+            return new TablePartitionId(internalTable.tableId(), partitionId);
+        }
     }
 
     /**
      * Starts the Vault component.
      */
     private static VaultManager createVault(Path workDir) {
-        Path vaultPath = workDir.resolve(Paths.get("vault"));
+        return new VaultManager(new PersistentVaultService(resolveDir(workDir, 
"vault")));
+    }
+
+    private static Path resolveDir(Path workDir, String dirName) {
+        Path newDirPath = workDir.resolve(dirName);
 
         try {
-            Files.createDirectories(vaultPath);
+            return Files.createDirectories(newDirPath);
         } catch (IOException e) {
             throw new IgniteInternalException(e);
         }
+    }
+
+    private static TableDefinition createTableDefinition(String tableName) {
+        return SchemaBuilders.tableBuilder("PUBLIC", tableName).columns(
+                SchemaBuilders.column("key", ColumnType.INT64).build(),
+                SchemaBuilders.column("val", 
ColumnType.INT32).asNullable(true).build()
+        ).withPrimaryKey("key").build();
+    }
+
+    private void createTableWithOnePartition(String tableName, int replicas, 
boolean testDataStorage) {
+        assertThat(
+                nodes.get(0).tableManager.createTableAsync(
+                        tableName,
+                        tableChange -> {
+                            
SchemaConfigurationConverter.convert(createTableDefinition(tableName), 
tableChange)
+                                    .changeReplicas(replicas)
+                                    .changePartitions(1);
+
+                            if (testDataStorage) {
+                                
tableChange.changeDataStorage(dataStorageChange -> 
dataStorageChange.convert(TestDataStorageChange.class));
+                            }
+                        }
+                ),
+                willCompleteSuccessfully()
+        );
+
+        assertEquals(replicas, getPartitionClusterNodes(0, 0).size());
+        assertEquals(replicas, getPartitionClusterNodes(1, 0).size());
+        assertEquals(replicas, getPartitionClusterNodes(2, 0).size());
+    }
+
+    private void changeTableReplicasForSinglePartition(String tableName, int 
replicas) {
+        assertThat(
+                nodes.get(0).tableManager.alterTableAsync(tableName, 
tableChange -> {
+                    tableChange.changeReplicas(replicas);
+
+                    return true;
+                }),
+                willCompleteSuccessfully()
+        );
+
+        waitPartitionAssignmentsSyncedToExpected(0, replicas);
+
+        assertEquals(replicas, getPartitionClusterNodes(0, 0).size());
+        assertEquals(replicas, getPartitionClusterNodes(1, 0).size());
+        assertEquals(replicas, getPartitionClusterNodes(2, 0).size());
+    }
+
+    private static Set<Assignment> getEvictedAssignments(Set<Assignment> 
beforeChange, Set<Assignment> afterChange) {
+        Set<Assignment> result = new HashSet<>(beforeChange);
+
+        result.removeAll(afterChange);
+
+        return result;
+    }
+
+    private static @Nullable InternalTable getInternalTable(Node node, String 
tableName) {
+        Table table = node.tableManager.table(tableName);
+
+        assertNotNull(table, tableName);
+
+        return ((TableImpl) table).internalTable();
+    }
+
+    private static void checkInvokeDestroyedPartitionStorages(Node node, 
String tableName, int partitionId) {
+        InternalTable internalTable = getInternalTable(node, tableName);
+
+        verify(internalTable.storage(), 
atLeast(1)).destroyPartition(partitionId);
+        verify(internalTable.txStateStorage(), 
atLeast(1)).destroyTxStateStorage(partitionId);
+    }
+
+    private static void throwExceptionOnInvokeDestroyPartitionStorages(Node 
node, String tableName, int partitionId) {
+        InternalTable internalTable = getInternalTable(node, tableName);
+
+        doAnswer(answer -> CompletableFuture.failedFuture(new 
StorageException("From test")))
+                .when(internalTable.storage())
+                .destroyPartition(partitionId);
+
+        doAnswer(answer -> CompletableFuture.failedFuture(new 
IgniteInternalException("From test")))
+                .when(internalTable.txStateStorage())
+                .destroyTxStateStorage(partitionId);
+    }
+
+    private void prepareFinishHandleChangeStableAssignmentEventFuture(Node 
node, String tableName, int partitionId) {
+        TablePartitionId tablePartitionId = new 
TablePartitionId(getInternalTable(node, tableName).tableId(), partitionId);
+
+        
node.finishHandleChangeStableAssignmentEventFutures.put(tablePartitionId, new 
CompletableFuture<>());
+    }
+
+    private CompletableFuture<?> 
collectFinishHandleChangeStableAssignmentEventFuture(
+            @Nullable Predicate<Node> nodeFilter,
+            String tableName,
+            int partitionId
+    ) {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (Node node : nodes) {
+            if (nodeFilter != null && !nodeFilter.test(node)) {
+                continue;
+            }
+
+            TablePartitionId tablePartitionId = new 
TablePartitionId(getInternalTable(node, tableName).tableId(), partitionId);
+
+            CompletableFuture<Void> future = 
node.finishHandleChangeStableAssignmentEventFutures.get(tablePartitionId);
+
+            assertNotNull(future, String.format("node=%s, table=%s, 
partitionId=%s", node.name, tableName, partitionId));
+
+            futures.add(future);
+        }
+
+        assertThat(String.format("tableName=%s, partitionId=%s", tableName, 
partitionId), futures, not(empty()));
 
-        return new VaultManager(new PersistentVaultService(vaultPath));
+        return 
CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new));
     }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
index 268ff8d496..8ec6af73d1 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.storage.impl;
 
+import static org.mockito.Mockito.spy;
+
 import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
@@ -55,6 +57,6 @@ public class TestStorageEngine implements StorageEngine {
 
         assert dataStorageName.equals(ENGINE_NAME) : dataStorageName;
 
-        return new TestMvTableStorage(tableCfg, tablesCfg);
+        return spy(new TestMvTableStorage(tableCfg, tablesCfg));
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index c6f3df376c..e3cd05a9d2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1220,6 +1220,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             assert table != null : IgniteStringFormatter.format("There is no 
table with the name specified [name={}, id={}]",
                     name, tblId);
 
+            // TODO: IGNITE-18703 Destroy raft log and meta
+
             CompletableFuture<Void> destroyTableStoragesFuture = allOf(
                     table.internalTable().storage().destroy(),
                     runAsync(() -> 
table.internalTable().txStateStorage().destroy(), ioExecutor)
@@ -1957,54 +1959,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 new WatchListener() {
             @Override
             public void onUpdate(WatchEvent evt) {
-                inBusyLock(busyLock, () -> {
-                    assert evt.single() : evt;
-
-                    Entry stableAssignmentsWatchEvent = 
evt.entryEvent().newEntry();
-
-                    if (stableAssignmentsWatchEvent.value() == null) {
-                        return;
-                    }
-
-                    int partitionId = 
extractPartitionNumber(stableAssignmentsWatchEvent.key());
-                    UUID tableId = 
extractTableId(stableAssignmentsWatchEvent.key(), STABLE_ASSIGNMENTS_PREFIX);
-
-                    TablePartitionId tablePartitionId = new 
TablePartitionId(tableId, partitionId);
-
-                    Set<Assignment> stableAssignments = 
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
-
-                    byte[] pendingAssignmentsFromMetaStorage = 
metaStorageMgr.get(
-                            pendingPartAssignmentsKey(tablePartitionId),
-                            stableAssignmentsWatchEvent.revision()
-                    ).join().value();
-
-                    Set<Assignment> pendingAssignments = 
pendingAssignmentsFromMetaStorage == null
-                            ? Set.of()
-                            : 
ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage);
-
-                    String localMemberName = 
clusterService.topologyService().localMember().name();
-
-                    boolean shouldStopLocalServices = 
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
-                            .noneMatch(assignment -> 
assignment.consistentId().equals(localMemberName));
-
-                    if (shouldStopLocalServices) {
-                        try {
-                            raftMgr.stopRaftNodes(tablePartitionId);
-
-                            replicaMgr.stopReplica(tablePartitionId);
-                        } catch (NodeStoppingException e) {
-                            // no-op
-                        }
-
-                        InternalTable internalTable = 
tablesByIdVv.latest().get(tableId).internalTable();
-
-                        // Should be done fairly quickly.
-                        allOf(
-                                
internalTable.storage().destroyPartition(partitionId),
-                                runAsync(() -> 
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
-                        ).join();
-                    }
-                });
+                handleChangeStableAssignmentEvent(evt);
             }
 
             @Override
@@ -2102,4 +2057,62 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 }, ioExecutor)
                 .thenCompose(Function.identity());
     }
+
+    /**
+     * Handles the {@link RebalanceUtil#STABLE_ASSIGNMENTS_PREFIX} update 
event.
+     *
+     * @param evt Event.
+     */
+    protected void handleChangeStableAssignmentEvent(WatchEvent evt) {
+        inBusyLock(busyLock, () -> {
+            assert evt.single() : evt;
+
+            Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
+
+            if (stableAssignmentsWatchEvent.value() == null) {
+                return;
+            }
+
+            int partitionId = 
extractPartitionNumber(stableAssignmentsWatchEvent.key());
+            UUID tableId = extractTableId(stableAssignmentsWatchEvent.key(), 
STABLE_ASSIGNMENTS_PREFIX);
+
+            TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partitionId);
+
+            Set<Assignment> stableAssignments = 
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
+
+            byte[] pendingAssignmentsFromMetaStorage = metaStorageMgr.get(
+                    pendingPartAssignmentsKey(tablePartitionId),
+                    stableAssignmentsWatchEvent.revision()
+            ).join().value();
+
+            Set<Assignment> pendingAssignments = 
pendingAssignmentsFromMetaStorage == null
+                    ? Set.of()
+                    : ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage);
+
+            String localMemberName = 
clusterService.topologyService().localMember().name();
+
+            boolean shouldStopLocalServices = 
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
+                    .noneMatch(assignment -> 
assignment.consistentId().equals(localMemberName));
+
+            if (shouldStopLocalServices) {
+                try {
+                    raftMgr.stopRaftNodes(tablePartitionId);
+
+                    replicaMgr.stopReplica(tablePartitionId);
+                } catch (NodeStoppingException e) {
+                    // no-op
+                }
+
+                InternalTable internalTable = 
tablesByIdVv.latest().get(tableId).internalTable();
+
+                // TODO: IGNITE-18703 Destroy raft log and meta
+
+                // Should be done fairly quickly.
+                allOf(
+                        internalTable.storage().destroyPartition(partitionId),
+                        runAsync(() -> 
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
+                ).join();
+            }
+        });
+    }
 }


Reply via email to