This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 95d87679e1 IGNITE-19363 Split start of indexes and start of partition 
raft group nodes (#2177)
95d87679e1 is described below

commit 95d87679e12dc9d8d1ad25f33d25a3029a45613a
Author: Semyon Danilov <[email protected]>
AuthorDate: Fri Jun 16 14:10:22 2023 +0400

    IGNITE-19363 Split start of indexes and start of partition raft group nodes 
(#2177)
---
 .../apache/ignite/internal/index/IndexManager.java |  16 +-
 .../ignite/internal/index/IndexManagerTest.java    |  15 +-
 .../storage/snapshot/SnapshotExecutorImpl.java     |   8 +-
 .../ignite/internal/rebalance/ItRebalanceTest.java |   2 +
 .../apache/ignite/internal/table/IndexWrapper.java | 127 +++++
 .../apache/ignite/internal/table/TableImpl.java    |  92 +---
 .../table/distributed/BitSetPartitionSet.java      |  86 +++
 .../internal/table/distributed/PartitionSet.java   | 138 +++++
 .../internal/table/distributed/TableManager.java   | 592 ++++++++++++---------
 .../distributed/AbstractPartitionSetTest.java      | 117 ++++
 .../table/distributed/BitSetPartitionSetTest.java  |  26 +
 .../table/distributed/TableManagerTest.java        |  42 +-
 12 files changed, 908 insertions(+), 353 deletions(-)

diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 390432bb44..8cbe2eca88 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -66,6 +66,7 @@ import 
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
 import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -465,11 +466,16 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
         CompletableFuture<?> fireEventFuture =
                 fireEvent(IndexEvent.CREATE, new 
IndexEventParameters(causalityToken, tableId, indexId, eventIndexDescriptor));
 
-        CompletableFuture<TableImpl> tableFuture = 
tableManager.tableAsync(causalityToken, tableId);
+        TableImpl table = tableManager.getTable(tableId);
+
+        assert table != null : tableId;
 
         CompletableFuture<SchemaRegistry> schemaRegistryFuture = 
schemaManager.schemaRegistry(causalityToken, tableId);
 
-        CompletableFuture<?> createIndexFuture = 
tableFuture.thenAcceptBoth(schemaRegistryFuture, (table, schemaRegistry) -> {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19712 Listen to 
assignment changes and start new index storages.
+        CompletableFuture<PartitionSet> tablePartitionFuture = 
tableManager.localPartitionSetAsync(causalityToken, tableId);
+
+        CompletableFuture<?> createIndexFuture = 
tablePartitionFuture.thenAcceptBoth(schemaRegistryFuture, (partitions, 
schemaRegistry) -> {
             TableRowToIndexKeyConverter tableRowConverter = new 
TableRowToIndexKeyConverter(
                     schemaRegistry,
                     eventIndexDescriptor.columns().toArray(STRING_EMPTY_ARRAY)
@@ -478,7 +484,8 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
             if (eventIndexDescriptor instanceof SortedIndexDescriptor) {
                 table.registerSortedIndex(
                         (StorageSortedIndexDescriptor) storageIndexDescriptor,
-                        tableRowConverter::convert
+                        tableRowConverter::convert,
+                        partitions
                 );
             } else {
                 boolean unique = indexDescriptor.unique();
@@ -486,7 +493,8 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
                 table.registerHashIndex(
                         (StorageHashIndexDescriptor) storageIndexDescriptor,
                         unique,
-                        tableRowConverter::convert
+                        tableRowConverter::convert,
+                        partitions
                 );
 
                 if (unique) {
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 4c75272ba3..8230656614 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -50,6 +51,7 @@ import 
org.apache.ignite.internal.schema.configuration.index.SortedIndexChange;
 import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -58,7 +60,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mockito;
 
 /**
  * Test class to verify {@link IndexManager}.
@@ -84,11 +85,21 @@ public class IndexManagerTest {
         when(tableManagerMock.tableAsync(anyLong(), anyInt())).thenAnswer(inv 
-> {
             InternalTable tbl = mock(InternalTable.class);
 
-            Mockito.doReturn(inv.getArgument(1)).when(tbl).tableId();
+            doReturn(inv.getArgument(1)).when(tbl).tableId();
 
             return completedFuture(new TableImpl(tbl, new HeapLockManager()));
         });
 
+        when(tableManagerMock.getTable(anyInt())).thenAnswer(inv -> {
+            InternalTable tbl = mock(InternalTable.class);
+
+            doReturn(inv.getArgument(0)).when(tbl).tableId();
+
+            return new TableImpl(tbl, new HeapLockManager());
+        });
+
+        when(tableManagerMock.localPartitionSetAsync(anyLong(), 
anyInt())).thenReturn(completedFuture(PartitionSet.EMPTY_SET));
+
         SchemaManager schManager = mock(SchemaManager.class);
 
         when(schManager.schemaRegistry(anyLong(), 
anyInt())).thenReturn(completedFuture(null));
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index f005b3199a..30d966add3 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -250,13 +250,7 @@ public class SnapshotExecutorImpl implements 
SnapshotExecutor {
         final FirstSnapshotLoadDone done = new FirstSnapshotLoadDone(reader);
         Requires.requireTrue(this.fsmCaller.onSnapshotLoad(done));
         try {
-            // TODO: IGNITE-19363 We want to avoid deadlock for now, but this 
is an ad-hoc decision.
-            // We don't wait for the partition's snapshot load closure to 
finish here.
-            if (!node.getNodeId().getGroupId().contains("part")) {
-                done.waitForRun();
-            } else {
-                done.status = Status.OK();
-            }
+            done.waitForRun();
         }
         catch (final InterruptedException e) {
             LOG.warn("Wait for FirstSnapshotLoadDone run is interrupted.");
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 5ae16bd940..963a800946 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
@@ -83,6 +84,7 @@ public class ItRebalanceTest extends IgniteIntegrationTest {
      *
      * @throws Exception If failed.
      */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19712";)
     @Test
     void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
         cluster.startAndInit(4);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/IndexWrapper.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexWrapper.java
new file mode 100644
index 0000000000..23d1d8ea5a
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexWrapper.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.table.distributed.HashIndexLocker;
+import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.tx.LockManager;
+
+/** Class that creates index storage and locker decorators for given 
partition. */
+abstract class IndexWrapper {
+    final InternalTable tbl;
+    final LockManager lockManager;
+    final int indexId;
+    final Function<BinaryRow, BinaryTuple> indexRowResolver;
+
+    private IndexWrapper(InternalTable tbl, LockManager lockManager, int 
indexId, Function<BinaryRow, BinaryTuple> indexRowResolver) {
+        this.tbl = tbl;
+        this.lockManager = lockManager;
+        this.indexId = indexId;
+        this.indexRowResolver = indexRowResolver;
+    }
+
+    /**
+     * Creates schema aware index storage wrapper.
+     *
+     * @param partitionId Partition id.
+     */
+    abstract TableSchemaAwareIndexStorage getStorage(int partitionId);
+
+    /**
+     * Creates schema aware index locker.
+     *
+     * @param partitionId Partition id.
+     */
+    abstract IndexLocker getLocker(int partitionId);
+
+    /** {@link IndexWrapper} for sorted indexes. */
+    static class SortedIndexWrapper extends IndexWrapper {
+        SortedIndexWrapper(InternalTable tbl, LockManager lockManager, int 
indexId, Function<BinaryRow, BinaryTuple> indexRowResolver) {
+            super(tbl, lockManager, indexId, indexRowResolver);
+        }
+
+        @Override
+        TableSchemaAwareIndexStorage getStorage(int partitionId) {
+            IndexStorage index = tbl.storage().getIndex(partitionId, indexId);
+
+            assert index != null : tbl.name() + " part " + partitionId;
+
+            return new TableSchemaAwareIndexStorage(
+                    indexId,
+                    index,
+                    indexRowResolver
+            );
+        }
+
+        @Override
+        IndexLocker getLocker(int partitionId) {
+            IndexStorage index = tbl.storage().getIndex(partitionId, indexId);
+
+            assert index != null : tbl.name() + " part " + partitionId;
+
+            return new SortedIndexLocker(
+                    indexId,
+                    partitionId,
+                    lockManager,
+                    (SortedIndexStorage) index,
+                    indexRowResolver
+            );
+        }
+    }
+
+    /** {@link IndexWrapper} for hash indexes. */
+    static class HashIndexWrapper extends IndexWrapper {
+        private final boolean unique;
+
+        HashIndexWrapper(InternalTable tbl, LockManager lockManager, int 
indexId, Function<BinaryRow, BinaryTuple> indexRowResolver,
+                boolean unique) {
+            super(tbl, lockManager, indexId, indexRowResolver);
+            this.unique = unique;
+        }
+
+        @Override
+        TableSchemaAwareIndexStorage getStorage(int partitionId) {
+            IndexStorage index = tbl.storage().getIndex(partitionId, indexId);
+
+            assert index != null : tbl.name() + " part " + partitionId;
+
+            return new TableSchemaAwareIndexStorage(
+                    indexId,
+                    index,
+                    indexRowResolver
+            );
+        }
+
+        @Override
+        IndexLocker getLocker(int partitionId) {
+            return new HashIndexLocker(
+                    indexId,
+                    unique,
+                    lockManager,
+                    indexRowResolver
+            );
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index f89d9a0d25..09a6137ad5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -39,9 +39,10 @@ import 
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
-import org.apache.ignite.internal.table.distributed.HashIndexLocker;
+import org.apache.ignite.internal.table.IndexWrapper.HashIndexWrapper;
+import org.apache.ignite.internal.table.IndexWrapper.SortedIndexWrapper;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
-import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.tx.LockManager;
@@ -71,8 +72,7 @@ public class TableImpl implements Table {
 
     private final Map<Integer, CompletableFuture<?>> indexesToWait = new 
ConcurrentHashMap<>();
 
-    private final Map<Integer, IndexStorageAdapterFactory> 
indexStorageAdapterFactories = new ConcurrentHashMap<>();
-    private final Map<Integer, IndexLockerFactory> indexLockerFactories = new 
ConcurrentHashMap<>();
+    private final Map<Integer, IndexWrapper> indexWrapperById = new 
ConcurrentHashMap<>();
 
     /**
      * Constructor.
@@ -229,12 +229,12 @@ public class TableImpl implements Table {
             public Map<Integer, TableSchemaAwareIndexStorage> get() {
                 awaitIndexes();
 
-                List<IndexStorageAdapterFactory> factories = new 
ArrayList<>(indexStorageAdapterFactories.values());
+                List<IndexWrapper> factories = new 
ArrayList<>(indexWrapperById.values());
 
                 Map<Integer, TableSchemaAwareIndexStorage> adapters = new 
HashMap<>();
 
-                for (IndexStorageAdapterFactory factory : factories) {
-                    TableSchemaAwareIndexStorage storage = 
factory.create(partId);
+                for (IndexWrapper factory : factories) {
+                    TableSchemaAwareIndexStorage storage = 
factory.getStorage(partId);
                     adapters.put(storage.id(), storage);
                 }
 
@@ -253,12 +253,12 @@ public class TableImpl implements Table {
         return () -> {
             awaitIndexes();
 
-            List<IndexLockerFactory> factories = new 
ArrayList<>(indexLockerFactories.values());
+            List<IndexWrapper> factories = new 
ArrayList<>(indexWrapperById.values());
 
             Map<Integer, IndexLocker> lockers = new 
HashMap<>(factories.size());
 
-            for (IndexLockerFactory factory : factories) {
-                IndexLocker locker = factory.create(partId);
+            for (IndexWrapper factory : factories) {
+                IndexLocker locker = factory.getLocker(partId);
                 lockers.put(locker.id(), locker);
             }
 
@@ -269,7 +269,7 @@ public class TableImpl implements Table {
     /**
      * The future completes when the primary key index is ready to use.
      *
-     * @return Future whcih complete when a primary index for the table is .
+     * @return Future which complete when a primary index for the table is .
      */
     public CompletableFuture<Void> pkIndexesReadyFuture() {
         var fut = new CompletableFuture<Void>();
@@ -289,27 +289,17 @@ public class TableImpl implements Table {
     public void registerHashIndex(
             StorageHashIndexDescriptor indexDescriptor,
             boolean unique,
-            Function<BinaryRow, BinaryTuple> searchRowResolver
+            Function<BinaryRow, BinaryTuple> searchRowResolver,
+            PartitionSet partitions
     ) {
         int indexId = indexDescriptor.id();
 
-        indexLockerFactories.put(
-                indexId,
-                partitionId -> new HashIndexLocker(
-                        indexId,
-                        unique,
-                        lockManager,
-                        searchRowResolver
-                )
-        );
-        indexStorageAdapterFactories.put(
-                indexId,
-                partitionId -> new TableSchemaAwareIndexStorage(
-                        indexId,
-                        tbl.storage().getOrCreateHashIndex(partitionId, 
indexDescriptor),
-                        searchRowResolver
-                )
-        );
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19112 Create 
storages once.
+        partitions.stream().forEach(partitionId -> {
+            tbl.storage().getOrCreateHashIndex(partitionId, indexDescriptor);
+        });
+
+        indexWrapperById.put(indexId, new HashIndexWrapper(tbl, lockManager, 
indexId, searchRowResolver, unique));
 
         completeWaitIndex(indexId);
     }
@@ -322,28 +312,17 @@ public class TableImpl implements Table {
      */
     public void registerSortedIndex(
             StorageSortedIndexDescriptor indexDescriptor,
-            Function<BinaryRow, BinaryTuple> searchRowResolver
+            Function<BinaryRow, BinaryTuple> searchRowResolver,
+            PartitionSet partitions
     ) {
         int indexId = indexDescriptor.id();
 
-        indexLockerFactories.put(
-                indexId,
-                partitionId -> new SortedIndexLocker(
-                        indexId,
-                        partitionId,
-                        lockManager,
-                        tbl.storage().getOrCreateSortedIndex(partitionId, 
indexDescriptor),
-                        searchRowResolver
-                )
-        );
-        indexStorageAdapterFactories.put(
-                indexId,
-                partitionId -> new TableSchemaAwareIndexStorage(
-                        indexId,
-                        tbl.storage().getOrCreateSortedIndex(partitionId, 
indexDescriptor),
-                        searchRowResolver
-                )
-        );
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19112 Create 
storages once.
+        partitions.stream().forEach(partitionId -> {
+            tbl.storage().getOrCreateSortedIndex(partitionId, indexDescriptor);
+        });
+
+        indexWrapperById.put(indexId, new SortedIndexWrapper(tbl, lockManager, 
indexId, searchRowResolver));
 
         completeWaitIndex(indexId);
     }
@@ -354,8 +333,7 @@ public class TableImpl implements Table {
      * @param indexId An index id to unregister.
      */
     public void unregisterIndex(int indexId) {
-        indexLockerFactories.remove(indexId);
-        indexStorageAdapterFactories.remove(indexId);
+        indexWrapperById.remove(indexId);
 
         completeWaitIndex(indexId);
 
@@ -386,18 +364,6 @@ public class TableImpl implements Table {
         indexesToWait.values().forEach(future -> 
future.completeExceptionally(closeTableException));
     }
 
-    @FunctionalInterface
-    private interface IndexLockerFactory {
-        /** Creates the index decorator for given partition. */
-        IndexLocker create(int partitionId);
-    }
-
-    @FunctionalInterface
-    private interface IndexStorageAdapterFactory {
-        /** Creates the index decorator for given partition. */
-        TableSchemaAwareIndexStorage create(int partitionId);
-    }
-
     /**
      * Adds indexes to wait, if not already created, before inserting data 
into the table.
      *
@@ -411,7 +377,7 @@ public class TableImpl implements Table {
                     return awaitIndexFuture;
                 }
 
-                if (indexStorageAdapterFactories.containsKey(indexId) && 
indexLockerFactories.containsKey(indexId)) {
+                if (indexWrapperById.containsKey(indexId)) {
                     // Index is already registered and created.
                     return null;
                 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSet.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSet.java
new file mode 100644
index 0000000000..a3a0854d2c
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSet.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.BitSet;
+import java.util.stream.IntStream;
+
+/**
+ * {@link BitSet} implementation of the {@link PartitionSet}.
+ */
+public class BitSetPartitionSet implements PartitionSet {
+    /** Backing BitSet. */
+    private final BitSet backingSet;
+
+    public BitSetPartitionSet() {
+        this.backingSet = new BitSet();
+    }
+
+    private BitSetPartitionSet(BitSet backingSet) {
+        this.backingSet = backingSet;
+    }
+
+    @Override
+    public void set(int partitionId) {
+        backingSet.set(partitionId);
+    }
+
+    @Override
+    public boolean get(int partitionId) {
+        return backingSet.get(partitionId);
+    }
+
+    @Override
+    public IntStream stream() {
+        return backingSet.stream();
+    }
+
+    @Override
+    public int size() {
+        return backingSet.cardinality();
+    }
+
+    @Override
+    public PartitionSet copy() {
+        return new BitSetPartitionSet((BitSet) backingSet.clone());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null) {
+            return false;
+        }
+
+        if (!(o instanceof BitSetPartitionSet)) {
+            return isEqual(o);
+        }
+
+        BitSetPartitionSet that = (BitSetPartitionSet) o;
+
+        return backingSet.equals(that.backingSet);
+    }
+
+    @Override
+    public int hashCode() {
+        return getHashCode();
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
new file mode 100644
index 0000000000..7d8244bb03
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.PrimitiveIterator.OfInt;
+import java.util.stream.IntStream;
+
+/**
+ * Represents a collection of partition IDs.
+ */
+public interface PartitionSet {
+    PartitionSet EMPTY_SET = new PartitionSet() {
+        @Override
+        public void set(int partitionId) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean get(int partitionId) {
+            return false;
+        }
+
+        @Override
+        public IntStream stream() {
+            return IntStream.empty();
+        }
+
+        @Override
+        public PartitionSet copy() {
+            return this;
+        }
+
+        @Override
+        public int size() {
+            return 0;
+        }
+
+        @Override
+        public int hashCode() {
+            return getHashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return isEqual(obj);
+        }
+    };
+
+    /**
+     * Adds the partition to the partition set.
+     *
+     * @param partitionId Partition ID.
+     */
+    void set(int partitionId);
+
+    /**
+     * Returns {@code true} if partition with {@code partitionId} is present 
in this set, {@code false} otherwise.
+     *
+     * @param partitionId Partition ID.
+     */
+    boolean get(int partitionId);
+
+    /**
+     * Returns a stream of partition IDs in this set.
+     * The IDs are returned in order, from lowest to highest.
+     */
+    IntStream stream();
+
+    /**
+     * Returns count of partitions.
+     */
+    int size();
+
+    /**
+     * Returns a copy of this {@link PartitionSet}.
+     */
+    PartitionSet copy();
+
+    /**
+     * Returns {@code true} if this partition set is equal to the parameter, 
{@code false} otherwise.
+     *
+     * @param another Object to be compared for equality with this partition 
set.
+     */
+    default boolean isEqual(Object another) {
+        if (!(another instanceof PartitionSet)) {
+            return false;
+        }
+
+        PartitionSet anotherSet = (PartitionSet) another;
+
+        if (size() != anotherSet.size()) {
+            return false;
+        }
+
+        OfInt iterator1 = stream().iterator();
+        OfInt iterator2 = anotherSet.stream().iterator();
+
+        while (iterator1.hasNext() && iterator2.hasNext()) {
+            if (iterator1.nextInt() != iterator2.nextInt()) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Returns the hash code value for this partition set.
+     */
+    default int getHashCode() {
+        int h = 0;
+
+        OfInt iter = stream().iterator();
+
+        while (iter.hasNext()) {
+            int idx = iter.nextInt();
+
+            h += idx;
+        }
+
+        return h;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 57eafeb922..c392d7a019 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -60,7 +60,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -275,6 +274,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      */
     private final IncrementalVersionedValue<Map<Integer, TableImpl>> 
tablesByIdVv;
 
+    /** Versioned store for local partition set by table id. */
+    private final IncrementalVersionedValue<Map<Integer, PartitionSet>> 
localPartsByTableIdVv;
+
     /**
      * Versioned store for tracking RAFT groups initialization and starting 
completion.
      * Only explicitly updated in {@link #createTablePartitionsLocally(long, 
List, int, TableImpl)}.
@@ -287,6 +289,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      */
     private final Map<Integer, TableImpl> pendingTables = new 
ConcurrentHashMap<>();
 
+    /** Started tables. */
+    private final Map<Integer, TableImpl> startedTables = new 
ConcurrentHashMap<>();
+
     /** Resolver that resolves a node consistent ID to cluster node. */
     private final Function<String, ClusterNode> clusterNodeResolver;
 
@@ -427,7 +432,10 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         placementDriver = new PlacementDriver(replicaSvc, clusterNodeResolver);
 
         tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new);
-        assignmentsUpdatedVv = new 
IncrementalVersionedValue<>(dependingOn(tablesByIdVv));
+
+        localPartsByTableIdVv = new IncrementalVersionedValue<>(registry, 
HashMap::new);
+
+        assignmentsUpdatedVv = new 
IncrementalVersionedValue<>(dependingOn(localPartsByTableIdVv));
 
         txStateStorageScheduledPool = 
Executors.newSingleThreadScheduledExecutor(
                 NamedThreadFactory.create(nodeName, 
"tx-state-storage-scheduled-pool", LOG));
@@ -691,8 +699,14 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         int partitions = newAssignments.size();
 
         CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 Process 
assignments and set partitions only for assigned partitions.
+        PartitionSet parts = new BitSetPartitionSet();
+
         for (int i = 0; i < futures.length; i++) {
             futures[i] = new CompletableFuture<>();
+
+            parts.set(i);
         }
 
         String localMemberName = localNode().name();
@@ -715,121 +729,110 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 TablePartitionId replicaGrpId = new TablePartitionId(tableId, 
partId);
 
-                placementDriver.updateAssignment(replicaGrpId, 
newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
+                placementDriver.updateAssignment(replicaGrpId, 
newConfiguration.peers().stream().map(Peer::consistentId)
+                        .collect(toList()));
 
-                PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker =
-                        new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
-                PendingComparableValuesTracker<Long, Void> storageIndexTracker 
=
-                        new PendingComparableValuesTracker<>(0L);
+                var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, Void>(
+                        new HybridTimestamp(1, 0)
+                );
+                var storageIndexTracker = new 
PendingComparableValuesTracker<Long, Void>(0L);
 
                 ((InternalTableImpl) 
internalTbl).updatePartitionTrackers(partId, safeTimeTracker, 
storageIndexTracker);
 
-                CompletableFuture<PartitionStorages> partitionStoragesFut = 
getOrCreatePartitionStorages(table, partId);
+                PartitionStorages partitionStorages = 
getPartitionStorages(table, partId);
 
-                CompletableFuture<PartitionDataStorage> 
partitionDataStorageFut = partitionStoragesFut
-                        .thenApply(partitionStorages -> 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
-                                internalTbl, partId));
-
-                CompletableFuture<PartitionUpdateHandlers> 
partitionUpdateHandlersFut = partitionDataStorageFut
-                        .thenApply(storage -> {
-                            PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
-                                    partId,
-                                    storage,
-                                    table,
-                                    getZoneById(zonesConfig, 
zoneId).dataStorage(),
-                                    safeTimeTracker
-                            );
+                PartitionDataStorage partitionDataStorage = 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+                        internalTbl, partId);
 
-                            mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
+                PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
+                        partId,
+                        partitionDataStorage,
+                        table,
+                        getZoneById(zonesConfig, zoneId).dataStorage(),
+                        safeTimeTracker
+                );
 
-                            return partitionUpdateHandlers;
-                        });
+                mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
 
                 CompletableFuture<Void> startGroupFut;
 
                 // start new nodes, only if it is table creation, other cases 
will be covered by rebalance logic
                 if (localMemberAssignment != null) {
-                    startGroupFut = 
partitionStoragesFut.thenComposeAsync(partitionStorages -> {
-                        CompletableFuture<Boolean> fut;
-
-                        // If Raft is running in in-memory mode or the PDS has 
been cleared, we need to remove the current node
-                        // from the Raft group in order to avoid the double 
vote problem.
-                        // <MUTED> See 
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
-                        // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
-                        if (internalTbl.storage().isVolatile()) {
-                            fut = queryDataNodesCount(tableId, partId, 
newConfiguration.peers()).thenApply(dataNodesCount -> {
-                                boolean fullPartitionRestart = dataNodesCount 
== 0;
-
-                                if (fullPartitionRestart) {
-                                    return true;
-                                }
+                    CompletableFuture<Boolean> shouldStartGroupFut;
+
+                    // If Raft is running in in-memory mode or the PDS has 
been cleared, we need to remove the current node
+                    // from the Raft group in order to avoid the double vote 
problem.
+                    // <MUTED> See 
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
+                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
+                    if (internalTbl.storage().isVolatile()) {
+                        shouldStartGroupFut = queryDataNodesCount(tableId, 
partId, newConfiguration.peers()).thenApply(dataNodesCount -> {
+                            boolean fullPartitionRestart = dataNodesCount == 0;
+
+                            if (fullPartitionRestart) {
+                                return true;
+                            }
 
-                                boolean majorityAvailable = dataNodesCount >= 
(newConfiguration.peers().size() / 2) + 1;
+                            boolean majorityAvailable = dataNodesCount >= 
(newConfiguration.peers().size() / 2) + 1;
 
-                                if (majorityAvailable) {
-                                    
RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment, 
metaStorageMgr);
+                            if (majorityAvailable) {
+                                RebalanceUtil.startPeerRemoval(replicaGrpId, 
localMemberAssignment, metaStorageMgr);
 
-                                    return false;
-                                } else {
-                                    // No majority and not a full partition 
restart - need to restart nodes
-                                    // with current partition.
-                                    String msg = "Unable to start partition " 
+ partId + ". Majority not available.";
+                                return false;
+                            } else {
+                                // No majority and not a full partition 
restart - need to restart nodes
+                                // with current partition.
+                                String msg = "Unable to start partition " + 
partId + ". Majority not available.";
 
-                                    throw new IgniteInternalException(msg);
-                                }
-                            });
-                        } else {
-                            fut = completedFuture(true);
+                                throw new IgniteInternalException(msg);
+                            }
+                        });
+                    } else {
+                        shouldStartGroupFut = completedFuture(true);
+                    }
+
+                    startGroupFut = 
shouldStartGroupFut.thenAcceptAsync(startGroup -> inBusyLock(busyLock, () -> {
+                        if (!startGroup) {
+                            return;
                         }
+                        TxStateStorage txStatePartitionStorage = 
partitionStorages.getTxStateStorage();
 
-                        return fut.thenCompose(startGroup -> 
inBusyLock(busyLock, () -> {
-                            if (!startGroup) {
-                                return completedFuture(null);
-                            }
+                        RaftGroupOptions groupOptions = 
groupOptionsForPartition(
+                                internalTbl.storage(),
+                                internalTbl.txStateStorage(),
+                                partitionKey(internalTbl, partId),
+                                partitionUpdateHandlers
+                        );
 
-                            return 
partitionDataStorageFut.thenAcceptBothAsync(partitionUpdateHandlersFut,
-                                    (partitionDataStorage, 
partitionUpdateHandlers) -> inBusyLock(busyLock, () -> {
-                                        TxStateStorage txStatePartitionStorage 
= partitionStorages.getTxStateStorage();
-
-                                        RaftGroupOptions groupOptions = 
groupOptionsForPartition(
-                                                internalTbl.storage(),
-                                                internalTbl.txStateStorage(),
-                                                partitionKey(internalTbl, 
partId),
-                                                partitionUpdateHandlers
-                                        );
-
-                                        Peer serverPeer = 
newConfiguration.peer(localMemberName);
-
-                                        var raftNodeId = new 
RaftNodeId(replicaGrpId, serverPeer);
-
-                                        try {
-                                            // TODO: use RaftManager 
interface, see https://issues.apache.org/jira/browse/IGNITE-18273
-                                            ((Loza) 
raftMgr).startRaftGroupNode(
-                                                    raftNodeId,
-                                                    newConfiguration,
-                                                    new PartitionListener(
-                                                            
partitionDataStorage,
-                                                            
partitionUpdateHandlers.storageUpdateHandler,
-                                                            
txStatePartitionStorage,
-                                                            safeTimeTracker,
-                                                            storageIndexTracker
-                                                    ),
-                                                    new 
RebalanceRaftGroupEventsListener(
-                                                            metaStorageMgr,
-                                                            replicaGrpId,
-                                                            busyLock,
-                                                            
createPartitionMover(internalTbl, partId),
-                                                            
this::calculateAssignments,
-                                                            rebalanceScheduler
-                                                    ),
-                                                    groupOptions
-                                            );
-                                        } catch (NodeStoppingException ex) {
-                                            throw new CompletionException(ex);
-                                        }
-                                    }), ioExecutor);
-                        }));
-                    }, ioExecutor);
+                        Peer serverPeer = 
newConfiguration.peer(localMemberName);
+
+                        var raftNodeId = new RaftNodeId(replicaGrpId, 
serverPeer);
+
+                        try {
+                            // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
+                            ((Loza) raftMgr).startRaftGroupNode(
+                                    raftNodeId,
+                                    newConfiguration,
+                                    new PartitionListener(
+                                            partitionDataStorage,
+                                            
partitionUpdateHandlers.storageUpdateHandler,
+                                            txStatePartitionStorage,
+                                            safeTimeTracker,
+                                            storageIndexTracker
+                                    ),
+                                    new RebalanceRaftGroupEventsListener(
+                                            metaStorageMgr,
+                                            replicaGrpId,
+                                            busyLock,
+                                            createPartitionMover(internalTbl, 
partId),
+                                            this::calculateAssignments,
+                                            rebalanceScheduler
+                                    ),
+                                    groupOptions
+                            );
+                        } catch (NodeStoppingException ex) {
+                            throw new CompletionException(ex);
+                        }
+                    }), ioExecutor);
                 } else {
                     startGroupFut = completedFuture(null);
                 }
@@ -843,55 +846,54 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 return failedFuture(ex);
                             }
                         }), ioExecutor)
-                        .thenComposeAsync(updatedRaftGroupService -> 
inBusyLock(busyLock, () -> {
+                        .thenAcceptAsync(updatedRaftGroupService -> 
inBusyLock(busyLock, () -> {
                             ((InternalTableImpl) 
internalTbl).updateInternalTableRaftGroupService(partId, 
updatedRaftGroupService);
 
                             if (localMemberAssignment == null) {
-                                return completedFuture(null);
+                                return;
                             }
 
-                            return 
partitionStoragesFut.thenAcceptBoth(partitionUpdateHandlersFut,
-                                    (partitionStorages, 
partitionUpdateHandlers) -> {
-                                        MvPartitionStorage partitionStorage = 
partitionStorages.getMvPartitionStorage();
-                                        TxStateStorage txStateStorage = 
partitionStorages.getTxStateStorage();
-
-                                        try {
-                                            replicaMgr.startReplica(
-                                                    replicaGrpId,
-                                                    allOf(
-                                                            ((Loza) 
raftMgr).raftNodeReadyFuture(replicaGrpId),
-                                                            
table.pkIndexesReadyFuture()
-                                                    ),
-                                                    new 
PartitionReplicaListener(
-                                                            partitionStorage,
-                                                            
updatedRaftGroupService,
-                                                            txManager,
-                                                            lockMgr,
-                                                            
scanRequestExecutor,
-                                                            partId,
-                                                            tableId,
-                                                            
table.indexesLockers(partId),
-                                                            new Lazy<>(() -> 
table.indexStorageAdapters(partId).get().get(table.pkId())),
-                                                            () -> 
table.indexStorageAdapters(partId).get(),
-                                                            clock,
-                                                            safeTimeTracker,
-                                                            txStateStorage,
-                                                            placementDriver,
-                                                            
partitionUpdateHandlers.storageUpdateHandler,
-                                                            new 
NonHistoricSchemas(schemaManager),
-                                                            
schemaManager.schemaRegistry(causalityToken, tableId),
-                                                            localNode(),
-                                                            
table.internalTable().storage(),
-                                                            indexBuilder,
-                                                            tablesCfg
-                                                    ),
-                                                    updatedRaftGroupService,
-                                                    storageIndexTracker
-                                            );
-                                        } catch (NodeStoppingException ex) {
-                                            throw new AssertionError("Loza was 
stopped before Table manager", ex);
-                                        }
-                                    });
+                            MvPartitionStorage partitionStorage = 
partitionStorages.getMvPartitionStorage();
+                            TxStateStorage txStateStorage = 
partitionStorages.getTxStateStorage();
+
+                            try {
+                                replicaMgr.startReplica(
+                                        replicaGrpId,
+                                        allOf(
+                                                ((Loza) 
raftMgr).raftNodeReadyFuture(replicaGrpId),
+                                                table.pkIndexesReadyFuture()
+                                        ),
+                                        new PartitionReplicaListener(
+                                                partitionStorage,
+                                                updatedRaftGroupService,
+                                                txManager,
+                                                lockMgr,
+                                                scanRequestExecutor,
+                                                partId,
+                                                tableId,
+                                                table.indexesLockers(partId),
+                                                new Lazy<>(
+                                                        () -> 
table.indexStorageAdapters(partId).get().get(table.pkId())
+                                                ),
+                                                () -> 
table.indexStorageAdapters(partId).get(),
+                                                clock,
+                                                safeTimeTracker,
+                                                txStateStorage,
+                                                placementDriver,
+                                                
partitionUpdateHandlers.storageUpdateHandler,
+                                                new 
NonHistoricSchemas(schemaManager),
+                                                
schemaManager.schemaRegistry(causalityToken, tableId),
+                                                localNode(),
+                                                
table.internalTable().storage(),
+                                                indexBuilder,
+                                                tablesCfg
+                                        ),
+                                        updatedRaftGroupService,
+                                        storageIndexTracker
+                                );
+                            } catch (NodeStoppingException ex) {
+                                throw new AssertionError("Loza was stopped 
before Table manager", ex);
+                            }
                         }), ioExecutor)
                         .whenComplete((res, ex) -> {
                             if (ex != null) {
@@ -905,17 +907,27 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             return allOf(futures);
         };
 
-        CompletableFuture<Void> updateAssignmentsFuture = 
tablesByIdVv.get(causalityToken).thenComposeAsync(
-                tablesById -> inBusyLock(busyLock, updateAssignmentsClosure),
-                ioExecutor
-        );
+        return localPartsByTableIdVv.update(causalityToken, (previous, 
throwable) -> inBusyLock(busyLock, () -> {
+            return getOrCreatePartitionStorages(table, parts).thenApply(u -> {
+                var newValue = new HashMap<>(previous);
 
-        return assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
-            if (e != null) {
-                return failedFuture(e);
-            }
+                newValue.put(tableId, parts);
 
-            return updateAssignmentsFuture;
+                return newValue;
+            });
+        })).thenCompose(unused -> {
+            CompletableFuture<Void> updateAssignmentsFuture = 
tablesByIdVv.get(causalityToken).thenComposeAsync(
+                    tablesById -> inBusyLock(busyLock, 
updateAssignmentsClosure),
+                    ioExecutor
+            );
+
+            return assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+                if (e != null) {
+                    return failedFuture(e);
+                }
+
+                return updateAssignmentsFuture;
+            });
         });
     }
 
@@ -1195,6 +1207,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         createTablePartitionsLocally(causalityToken, assignments, 
zoneDescriptor.id(), table);
 
         pendingTables.put(tableId, table);
+        startedTables.put(tableId, table);
 
         tablesById(causalityToken).thenAccept(ignored -> inBusyLock(busyLock, 
() -> {
             pendingTables.remove(tableId);
@@ -1303,6 +1316,17 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         .thenCompose((notUsed) -> 
mvGc.removeStorage(replicationGroupId));
             }
 
+            localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
+                if (e != null) {
+                    return failedFuture(e);
+                }
+
+                var newMap = new HashMap<>(previousVal);
+                newMap.remove(tableId);
+
+                return completedFuture(newMap);
+            }));
+
             tablesByIdVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
                 if (e != null) {
                     return failedFuture(e);
@@ -1334,6 +1358,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         .thenApply(v -> map);
             }));
 
+            startedTables.remove(tableId);
+
             fireEvent(TableEvent.DROP, new 
TableEventParameters(causalityToken, tableId))
                     .whenComplete((v, e) -> {
                         if (e != null) {
@@ -1737,15 +1763,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 .collect(toList());
     }
 
-    /**
-     * Collects a list of direct index ids.
-     *
-     * @return A list of direct index ids.
-     */
-    private List<UUID> directIndexIds() {
-        return directProxy(tablesCfg.indexes()).internalIds();
-    }
-
     /**
      * Gets direct id of table with {@code tblName}.
      *
@@ -1858,6 +1875,24 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         }
     }
 
+    /**
+     * Asynchronously gets the local partitions set of a table using causality 
token.
+     *
+     * @param causalityToken Causality token.
+     * @return Future.
+     */
+    public CompletableFuture<PartitionSet> localPartitionSetAsync(long 
causalityToken, int tableId) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return 
localPartsByTableIdVv.get(causalityToken).thenApply(partitionSetById -> 
partitionSetById.get(tableId));
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override
     public TableImpl tableImpl(String name) {
@@ -2061,6 +2096,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                     try {
                         return handleChangePendingAssignmentEvent(
+                                evt.revision(),
                                 replicaGrpId,
                                 tables.get(tblId),
                                 pendingAssignmentsWatchEntry,
@@ -2074,6 +2110,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     }
 
     private CompletableFuture<Void> handleChangePendingAssignmentEvent(
+            long causalityToken,
             TablePartitionId replicaGrpId,
             TableImpl tbl,
             Entry pendingAssignmentsEntry,
@@ -2086,6 +2123,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         CatalogTableDescriptor tableDescriptor = 
getTableDescriptor(tbl.tableId());
 
+        int tableId = tbl.tableId();
         int partId = replicaGrpId.partitionId();
 
         byte[] stableAssignmentsBytes = stableAssignmentsEntry.value();
@@ -2119,97 +2157,108 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         CompletableFuture<Void> localServicesStartFuture;
 
         if (shouldStartLocalServices) {
-            ((InternalTableImpl) 
internalTable).updatePartitionTrackers(partId, safeTimeTracker, 
storageIndexTracker);
+            localServicesStartFuture = 
localPartsByTableIdVv.get(causalityToken).thenComposeAsync(oldMap -> {
+                PartitionSet partitionSet = oldMap.get(tableId).copy();
 
-            localServicesStartFuture = getOrCreatePartitionStorages(tbl, 
partId)
-                    .thenAcceptAsync(partitionStorages -> {
-                        MvPartitionStorage mvPartitionStorage = 
partitionStorages.getMvPartitionStorage();
-                        TxStateStorage txStatePartitionStorage = 
partitionStorages.getTxStateStorage();
+                return getOrCreatePartitionStorages(tbl, 
partitionSet).thenApply(u -> {
+                    var newMap = new HashMap<>(oldMap);
 
-                        PartitionDataStorage partitionDataStorage = 
partitionDataStorage(mvPartitionStorage, internalTable, partId);
+                    newMap.put(tableId, partitionSet);
 
-                        PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
-                                partId,
-                                partitionDataStorage,
-                                tbl,
-                                getZoneById(zonesConfig, 
tableDescriptor.zoneId()).dataStorage(),
-                                safeTimeTracker
-                        );
+                    return newMap;
+                });
+            }).thenComposeAsync(unused -> {
+                PartitionStorages partitionStorages = 
getPartitionStorages(tbl, partId);
 
-                        RaftGroupOptions groupOptions = 
groupOptionsForPartition(
-                                internalTable.storage(),
-                                internalTable.txStateStorage(),
-                                partitionKey(internalTable, partId),
-                                partitionUpdateHandlers
-                        );
+                MvPartitionStorage mvPartitionStorage = 
partitionStorages.getMvPartitionStorage();
+                TxStateStorage txStatePartitionStorage = 
partitionStorages.getTxStateStorage();
 
-                        RaftGroupListener raftGrpLsnr = new PartitionListener(
-                                partitionDataStorage,
-                                partitionUpdateHandlers.storageUpdateHandler,
-                                txStatePartitionStorage,
-                                safeTimeTracker,
-                                storageIndexTracker
-                        );
+                PartitionDataStorage partitionDataStorage = 
partitionDataStorage(mvPartitionStorage, internalTable, partId);
 
-                        RaftGroupEventsListener raftGrpEvtsLsnr = new 
RebalanceRaftGroupEventsListener(
-                                metaStorageMgr,
-                                replicaGrpId,
-                                busyLock,
-                                createPartitionMover(internalTable, partId),
-                                this::calculateAssignments,
-                                rebalanceScheduler
-                        );
+                PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
+                        partId,
+                        partitionDataStorage,
+                        tbl,
+                        getZoneById(zonesConfig, 
tableDescriptor.zoneId()).dataStorage(),
+                        safeTimeTracker
+                );
 
-                        Peer serverPeer = 
pendingConfiguration.peer(localMember.name());
+                RaftGroupOptions groupOptions = groupOptionsForPartition(
+                        internalTable.storage(),
+                        internalTable.txStateStorage(),
+                        partitionKey(internalTable, partId),
+                        partitionUpdateHandlers
+                );
 
-                        var raftNodeId = new RaftNodeId(replicaGrpId, 
serverPeer);
+                RaftGroupListener raftGrpLsnr = new PartitionListener(
+                        partitionDataStorage,
+                        partitionUpdateHandlers.storageUpdateHandler,
+                        txStatePartitionStorage,
+                        safeTimeTracker,
+                        storageIndexTracker
+                );
 
-                        try {
-                            // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
-                            ((Loza) raftMgr).startRaftGroupNode(
-                                    raftNodeId,
-                                    stableConfiguration,
-                                    raftGrpLsnr,
-                                    raftGrpEvtsLsnr,
-                                    groupOptions
-                            );
+                RaftGroupEventsListener raftGrpEvtsLsnr = new 
RebalanceRaftGroupEventsListener(
+                        metaStorageMgr,
+                        replicaGrpId,
+                        busyLock,
+                        createPartitionMover(internalTable, partId),
+                        this::calculateAssignments,
+                        rebalanceScheduler
+                );
 
-                            replicaMgr.startReplica(
-                                    replicaGrpId,
-                                    allOf(
-                                            ((Loza) 
raftMgr).raftNodeReadyFuture(replicaGrpId),
-                                            tbl.pkIndexesReadyFuture()
-                                    ),
-                                    new PartitionReplicaListener(
-                                            mvPartitionStorage,
-                                            
internalTable.partitionRaftGroupService(partId),
-                                            txManager,
-                                            lockMgr,
-                                            scanRequestExecutor,
-                                            partId,
-                                            tbl.tableId(),
-                                            tbl.indexesLockers(partId),
-                                            new Lazy<>(() -> 
tbl.indexStorageAdapters(partId).get().get(tbl.pkId())),
-                                            () -> 
tbl.indexStorageAdapters(partId).get(),
-                                            clock,
-                                            safeTimeTracker,
-                                            txStatePartitionStorage,
-                                            placementDriver,
-                                            
partitionUpdateHandlers.storageUpdateHandler,
-                                            new 
NonHistoricSchemas(schemaManager),
-                                            
completedFuture(schemaManager.schemaRegistry(tbl.tableId())),
-                                            localNode(),
-                                            internalTable.storage(),
-                                            indexBuilder,
-                                            tablesCfg
-                                    ),
-                                    (TopologyAwareRaftGroupService) 
internalTable.partitionRaftGroupService(partId),
-                                    storageIndexTracker
-                            );
-                        } catch (NodeStoppingException ignored) {
-                            // no-op
-                        }
-                    }, ioExecutor);
+                Peer serverPeer = 
pendingConfiguration.peer(localMember.name());
+
+                var raftNodeId = new RaftNodeId(replicaGrpId, serverPeer);
+
+                return runAsync(() -> inBusyLock(busyLock, () -> {
+                    try {
+                        // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
+                        ((Loza) raftMgr).startRaftGroupNode(
+                                raftNodeId,
+                                stableConfiguration,
+                                raftGrpLsnr,
+                                raftGrpEvtsLsnr,
+                                groupOptions
+                        );
+
+                        replicaMgr.startReplica(
+                                replicaGrpId,
+                                allOf(
+                                        ((Loza) 
raftMgr).raftNodeReadyFuture(replicaGrpId),
+                                        tbl.pkIndexesReadyFuture()
+                                ),
+                                new PartitionReplicaListener(
+                                        mvPartitionStorage,
+                                        
internalTable.partitionRaftGroupService(partId),
+                                        txManager,
+                                        lockMgr,
+                                        scanRequestExecutor,
+                                        partId,
+                                        tableId,
+                                        tbl.indexesLockers(partId),
+                                        new Lazy<>(() -> 
tbl.indexStorageAdapters(partId).get().get(tbl.pkId())),
+                                        () -> 
tbl.indexStorageAdapters(partId).get(),
+                                        clock,
+                                        safeTimeTracker,
+                                        txStatePartitionStorage,
+                                        placementDriver,
+                                        
partitionUpdateHandlers.storageUpdateHandler,
+                                        new NonHistoricSchemas(schemaManager),
+                                        
completedFuture(schemaManager.schemaRegistry(tableId)),
+                                        localNode(),
+                                        internalTable.storage(),
+                                        indexBuilder,
+                                        tablesCfg
+                                ),
+                                (TopologyAwareRaftGroupService) 
internalTable.partitionRaftGroupService(partId),
+                                storageIndexTracker
+                        );
+                    } catch (NodeStoppingException ignored) {
+                        // No-op.
+                    }
+                }), ioExecutor);
+            });
         } else {
             localServicesStartFuture = completedFuture(null);
         }
@@ -2356,32 +2405,50 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     }
 
     /**
-     * Creates partition stores. If one of the storages has not completed the 
rebalance, then the storages are cleared.
+     * Gets partition stores.
      *
      * @param table Table.
      * @param partitionId Partition ID.
-     * @return Future of creating or getting partition stores.
+     * @return PartitionStorages.
      */
-    // TODO: IGNITE-18939 Create storages only once, then only get them
-    private CompletableFuture<PartitionStorages> 
getOrCreatePartitionStorages(TableImpl table, int partitionId) {
+    private PartitionStorages getPartitionStorages(TableImpl table, int 
partitionId) {
         InternalTable internalTable = table.internalTable();
 
         MvPartitionStorage mvPartition = 
internalTable.storage().getMvPartition(partitionId);
 
-        return (mvPartition != null ? completedFuture(mvPartition) : 
internalTable.storage().createMvPartition(partitionId))
-                .thenComposeAsync(mvPartitionStorage -> {
-                    TxStateStorage txStateStorage = 
internalTable.txStateStorage().getOrCreateTxStateStorage(partitionId);
+        assert mvPartition != null;
 
-                    if (mvPartitionStorage.persistedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS
-                            || txStateStorage.persistedIndex() == 
TxStateStorage.REBALANCE_IN_PROGRESS) {
-                        return allOf(
-                                
internalTable.storage().clearPartition(partitionId),
-                                txStateStorage.clear()
-                        ).thenApply(unused -> new 
PartitionStorages(mvPartitionStorage, txStateStorage));
-                    } else {
-                        return completedFuture(new 
PartitionStorages(mvPartitionStorage, txStateStorage));
-                    }
-                }, ioExecutor);
+        TxStateStorage txStateStorage = 
internalTable.txStateStorage().getTxStateStorage(partitionId);
+
+        assert txStateStorage != null;
+
+        return new PartitionStorages(mvPartition, txStateStorage);
+    }
+
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19739 Create 
storages only once.
+    private CompletableFuture<Void> getOrCreatePartitionStorages(TableImpl 
table, PartitionSet partitions) {
+        InternalTable internalTable = table.internalTable();
+
+        CompletableFuture<?>[] storageFuts = 
partitions.stream().mapToObj(partitionId -> {
+            MvPartitionStorage mvPartition = 
internalTable.storage().getMvPartition(partitionId);
+
+            return (mvPartition != null ? completedFuture(mvPartition) : 
internalTable.storage().createMvPartition(partitionId))
+                    .thenComposeAsync(mvPartitionStorage -> {
+                        TxStateStorage txStateStorage = 
internalTable.txStateStorage().getOrCreateTxStateStorage(partitionId);
+
+                        if (mvPartitionStorage.persistedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS
+                                || txStateStorage.persistedIndex() == 
TxStateStorage.REBALANCE_IN_PROGRESS) {
+                            return allOf(
+                                    
internalTable.storage().clearPartition(partitionId),
+                                    txStateStorage.clear()
+                            ).thenApply(unused -> new 
PartitionStorages(mvPartitionStorage, txStateStorage));
+                        } else {
+                            return completedFuture(new 
PartitionStorages(mvPartitionStorage, txStateStorage));
+                        }
+                    }, ioExecutor);
+        }).toArray(CompletableFuture[]::new);
+
+        return allOf(storageFuts);
     }
 
     /**
@@ -2529,6 +2596,15 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         return new PartitionUpdateHandlers(storageUpdateHandler, 
indexUpdateHandler, gcUpdateHandler);
     }
 
+    /**
+     * Returns a table instance if it exists, {@code null} otherwise.
+     *
+     * @param tableId Table id.
+     */
+    public @Nullable TableImpl getTable(int tableId) {
+        return startedTables.get(tableId);
+    }
+
     private @Nullable CatalogTableDescriptor getTableDescriptor(int id) {
         TableView tableView = findTableView(tablesCfg.value(), id);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/AbstractPartitionSetTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/AbstractPartitionSetTest.java
new file mode 100644
index 0000000000..1c5c534d37
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/AbstractPartitionSetTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Base tests for {@link PartitionSet}. */
+abstract class AbstractPartitionSetTest {
+    private PartitionSet partitionSet;
+
+    @BeforeEach
+    void setUp() {
+        partitionSet = createSet();
+    }
+
+    @Test
+    public void testSetPartition() {
+        int partCount = 65_000;
+
+        // Set every even partition.
+        for (int i = 0; i < partCount; i++) {
+            if ((i % 2) == 0) {
+                partitionSet.set(i);
+            }
+        }
+
+        // Check that only partitions that were set are actually set.
+        for (int i = 0; i < partCount; i++) {
+            boolean isSet = partitionSet.get(i);
+
+            assertEquals((i % 2) == 0, isSet);
+        }
+    }
+
+    @Test
+    public void testStream() {
+        int partCount = 65_000;
+
+        int setPartitionsCount = 0;
+
+        // Set every even partition.
+        for (int i = 0; i < partCount; i++) {
+            if ((i % 2) == 0) {
+                partitionSet.set(i);
+                setPartitionsCount++;
+            }
+        }
+
+        var realCount = new AtomicInteger();
+
+        // Check that only partitions that were set are actually set.
+        partitionSet.stream().forEach(value -> {
+            assertTrue((value % 2) == 0);
+            realCount.incrementAndGet();
+        });
+
+        assertEquals(setPartitionsCount, realCount.get());
+    }
+
+    /**
+     * Tests that copy is equal, but is not the same, as the copied set.
+     */
+    @Test
+    public void testCopy() {
+        PartitionSet copy = partitionSet.copy();
+
+        assertEquals(partitionSet, copy);
+        assertEquals(partitionSet.hashCode(), copy.hashCode());
+        assertNotSame(partitionSet, copy);
+    }
+
+    /**
+     * Tests that empty sets of different classes are equal.
+     */
+    @Test
+    public void testEmptyEqual() {
+        assertEquals(partitionSet, PartitionSet.EMPTY_SET);
+        assertEquals(PartitionSet.EMPTY_SET, partitionSet);
+
+        assertEquals(partitionSet.hashCode(), 
PartitionSet.EMPTY_SET.hashCode());
+        assertEquals(PartitionSet.EMPTY_SET.hashCode(), 
partitionSet.hashCode());
+    }
+
+    @Test
+    public void testDifferentNotEqual() {
+        partitionSet.set(0);
+
+        PartitionSet anotherSet = createSet();
+        anotherSet.set(1);
+
+        assertNotEquals(partitionSet, anotherSet);
+    }
+
+    abstract PartitionSet createSet();
+}
\ No newline at end of file
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSetTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSetTest.java
new file mode 100644
index 0000000000..54888676ca
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSetTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+/** Tests of the {@link BitSetPartitionSet} implementation of {@link 
PartitionSet}. */
+class BitSetPartitionSetTest extends AbstractPartitionSetTest {
+    @Override
+    PartitionSet createSet() {
+        return new BitSetPartitionSet();
+    }
+}
\ No newline at end of file
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index a281cb67b4..d9ac117a58 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManag
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -318,7 +319,6 @@ public class TableManagerTest extends IgniteAbstractTest {
         createDistributionZone();
 
         tblsCfg.tables().change(tablesChange -> {
-
             tablesChange.create(scmTbl.name(), tableChange -> {
                 (SchemaConfigurationConverter.convert(scmTbl, tableChange))
                         .changeZoneId(ZONE_ID);
@@ -654,19 +654,13 @@ public class TableManagerTest extends IgniteAbstractTest {
         when(rm.raftNodeReadyFuture(any())).thenReturn(completedFuture(1L));
         when(bm.nodes()).thenReturn(Set.of(node));
 
-        distributionZonesConfiguration.distributionZones().change(zones -> {
+        
assertThat(distributionZonesConfiguration.distributionZones().change(zones -> {
             zones.create(ZONE_NAME, ch -> {
                 ch.changeZoneId(ZONE_ID);
                 ch.changePartitions(1);
                 ch.changeReplicas(1);
             });
-        }).join();
-
-        mockMetastore();
-
-        TableManager tableManager = createTableManager(tblManagerFut);
-
-        tblManagerFut.complete(tableManager);
+        }), willSucceedFast());
 
         var txStateStorage = mock(TxStateStorage.class);
         var mvPartitionStorage = mock(MvPartitionStorage.class);
@@ -679,21 +673,19 @@ public class TableManagerTest extends IgniteAbstractTest {
             
when(mvPartitionStorage.persistedIndex()).thenReturn(MvPartitionStorage.REBALANCE_IN_PROGRESS);
         }
 
+        
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
         when(txStateStorage.clear()).thenReturn(completedFuture(null));
 
-        // We need to mock storages inside a configuration listener because of 
how mocks are created in the Table Manager,
-        // see "createTableManager".
-        tblsCfg.tables().any().listen(ctx -> {
-            // For some reason, "when(something).thenReturn" does not work on 
spies, but this notation works.
-            
doReturn(txStateStorage).when(txStateTableStorage).getOrCreateTxStateStorage(anyInt());
-            
doReturn(txStateStorage).when(txStateTableStorage).getTxStateStorage(anyInt());
+        mockMetastore();
 
+        // For some reason, "when(something).thenReturn" does not work on 
spies, but this notation works.
+        TableManager tableManager = createTableManager(tblManagerFut, 
(mvTableStorage) -> {
             
doReturn(completedFuture(mvPartitionStorage)).when(mvTableStorage).createMvPartition(anyInt());
             
doReturn(mvPartitionStorage).when(mvTableStorage).getMvPartition(anyInt());
-            
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
             
doReturn(completedFuture(null)).when(mvTableStorage).clearPartition(anyInt());
-
-            return completedFuture(null);
+        }, (txStateTableStorage) -> {
+            
doReturn(txStateStorage).when(txStateTableStorage).getOrCreateTxStateStorage(anyInt());
+            
doReturn(txStateStorage).when(txStateTableStorage).getTxStateStorage(anyInt());
         });
 
         TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC", 
PRECONFIGURED_TABLE_NAME).columns(
@@ -839,13 +831,21 @@ public class TableManagerTest extends IgniteAbstractTest {
         return tbl2;
     }
 
+    private TableManager createTableManager(CompletableFuture<TableManager> 
tblManagerFut) {
+        return createTableManager(tblManagerFut, unused -> {}, unused -> {});
+    }
+
     /**
      * Creates Table manager.
      *
      * @param tblManagerFut Future to wrap Table manager.
+     * @param tableStorageDecorator Table storage spy decorator.
+     * @param txStateTableStorageDecorator Tx state table storage spy 
decorator.
+     *
      * @return Table manager.
      */
-    private TableManager createTableManager(CompletableFuture<TableManager> 
tblManagerFut) {
+    private TableManager createTableManager(CompletableFuture<TableManager> 
tblManagerFut, Consumer<MvTableStorage> tableStorageDecorator,
+            Consumer<TxStateTableStorage> txStateTableStorageDecorator) {
         VaultManager vaultManager = mock(VaultManager.class);
 
         
when(vaultManager.get(any(ByteArray.class))).thenReturn(completedFuture(null));
@@ -881,6 +881,8 @@ public class TableManagerTest extends IgniteAbstractTest {
             protected MvTableStorage createTableStorage(CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
                 mvTableStorage = spy(super.createTableStorage(tableDescriptor, 
zoneDescriptor));
 
+                tableStorageDecorator.accept(mvTableStorage);
+
                 return mvTableStorage;
             }
 
@@ -891,6 +893,8 @@ public class TableManagerTest extends IgniteAbstractTest {
             ) {
                 txStateTableStorage = 
spy(super.createTxStateTableStorage(tableDescriptor, zoneDescriptor));
 
+                txStateTableStorageDecorator.accept(txStateTableStorage);
+
                 return txStateTableStorage;
             }
         };

Reply via email to