This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1431f13855 IGNITE-19177 Add assignments checks when building indexes 
(#2030)
1431f13855 is described below

commit 1431f1385563a9e0c4441c80b491995eb4a9a714
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon May 22 12:34:03 2023 +0300

    IGNITE-19177 Add assignments checks when building indexes (#2030)
---
 .../apache/ignite/internal/index/IndexBuilder.java | 379 ---------------------
 .../apache/ignite/internal/index/IndexManager.java |  32 +-
 .../ignite/internal/index/IndexManagerTest.java    |   4 +-
 .../apache/ignite/internal/replicator/Replica.java |   9 +
 .../ignite/internal/replicator/ReplicaManager.java |  15 +-
 .../replicator/listener/ReplicaListener.java       |  18 +
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +-
 .../sql/engine/ClusterPerClassIntegrationTest.java |  42 +--
 .../internal/sql/engine/ItBuildIndexTest.java      |  22 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   2 +-
 .../internal/storage/index/IndexDescriptor.java    |  22 ++
 .../distributed/ItTxDistributedTestSingleNode.java |  27 +-
 .../table/distributed/StorageUpdateHandler.java    |   8 +
 .../internal/table/distributed/TableManager.java   |  31 +-
 .../table/distributed/index/IndexBuildTask.java    | 225 ++++++++++++
 .../table/distributed/index/IndexBuildTaskId.java  |  77 +++++
 .../table/distributed/index/IndexBuilder.java      | 186 ++++++++++
 .../raft/snapshot/PartitionAccessImpl.java         |   6 +
 .../replicator/PartitionReplicaListener.java       | 235 ++++++++++---
 .../PartitionReplicaListenerIndexLockingTest.java  |  17 +-
 .../replication/PartitionReplicaListenerTest.java  |  16 +-
 .../table/impl/DummyInternalTableImpl.java         |   7 +-
 23 files changed, 872 insertions(+), 512 deletions(-)

diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
deleted file mode 100644
index 3a20d6d627..0000000000
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.index;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.index.IndexDescriptor;
-import org.apache.ignite.internal.storage.index.IndexStorage;
-import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
-import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.network.ClusterService;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Class for managing the index building process.
- */
-class IndexBuilder {
-    private static final IgniteLogger LOG = 
Loggers.forClass(IndexBuilder.class);
-
-    /** Batch size of row IDs to build the index. */
-    private static final int BUILD_INDEX_ROW_ID_BATCH_SIZE = 100;
-
-    /** Message factory to create messages - RAFT commands. */
-    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new 
TableMessagesFactory();
-
-    /** Busy lock to stop synchronously. */
-    private final IgniteSpinBusyLock busyLock;
-
-    /** Cluster service. */
-    private final ClusterService clusterService;
-
-    /** Index building executor. */
-    private final ExecutorService buildIndexExecutor;
-
-    /** Tasks of building indexes by their ID. */
-    private final Map<BuildIndexTaskId, BuildIndexTask> buildIndexTaskById = 
new ConcurrentHashMap<>();
-
-    IndexBuilder(String nodeName, IgniteSpinBusyLock busyLock, ClusterService 
clusterService) {
-        this.busyLock = busyLock;
-        this.clusterService = clusterService;
-
-        int cpus = Runtime.getRuntime().availableProcessors();
-
-        buildIndexExecutor = new ThreadPoolExecutor(
-                cpus,
-                cpus,
-                30,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
-                NamedThreadFactory.create(nodeName, "build-index", LOG)
-        );
-    }
-
-    /**
-     * Stops the index builder.
-     */
-    void stop() {
-        shutdownAndAwaitTermination(buildIndexExecutor, 10, TimeUnit.SECONDS);
-    }
-
-    /**
-     * Initializes the build of the index.
-     */
-    void startIndexBuild(TableIndexView tableIndexView, TableImpl table, 
IndexDescriptor indexDescriptor) {
-        for (int partitionId = 0; partitionId < 
table.internalTable().partitions(); partitionId++) {
-            // TODO: IGNITE-19112 We only need to create the index store once
-            table.internalTable().storage().getOrCreateIndex(partitionId, 
indexDescriptor);
-
-            // TODO: IGNITE-19177 Add assignments check
-            buildIndexTaskById.compute(
-                    new BuildIndexTaskId(table.tableId(), tableIndexView.id(), 
partitionId),
-                    (buildIndexTaskId, previousBuildIndexTask) -> {
-                        assert previousBuildIndexTask == null : 
buildIndexTaskId;
-
-                        BuildIndexTask buildIndexTask = new 
BuildIndexTask(table, tableIndexView, buildIndexTaskId.getPartitionId(), null);
-
-                        buildIndexExecutor.submit(buildIndexTask);
-
-                        return buildIndexTask;
-                    });
-        }
-    }
-
-    /**
-     * Stops the build of the index.
-     */
-    void stopIndexBuild(TableIndexView tableIndexView, TableImpl table) {
-        for (int partitionId = 0; partitionId < 
table.internalTable().partitions(); partitionId++) {
-            BuildIndexTask buildIndexTask = buildIndexTaskById.remove(
-                    new BuildIndexTaskId(table.tableId(), tableIndexView.id(), 
partitionId)
-            );
-
-            if (buildIndexTask != null) {
-                buildIndexTask.stop();
-            }
-        }
-    }
-
-    /**
-     * Task of building a table index for a partition.
-     *
-     * <p>Only the leader of the raft group will manage the building of the 
index. Leader sends batches of row IDs via
-     * {@link BuildIndexCommand}, the next batch will only be send after the 
previous batch has been processed.
-     *
-     * <p>Index building itself occurs locally on each node of the raft group 
when processing {@link BuildIndexCommand}. This ensures that
-     * the index build process in the raft group is consistent and that the 
index build process is restored after restarting the raft group
-     * (not from the beginning).
-     */
-    private class BuildIndexTask implements Runnable {
-        private final TableImpl table;
-
-        private final TableIndexView tableIndexView;
-
-        private final int partitionId;
-
-        /**
-         * ID of the next row to build the index from the previous batch, 
{@code null} if it is the first row after the index was created
-         * (both on a live node and after a restore).
-         */
-        private final @Nullable RowId nextRowIdToBuildFromPreviousBatch;
-
-        /** Busy lock to stop task synchronously. */
-        private final IgniteSpinBusyLock taskBusyLock = new 
IgniteSpinBusyLock();
-
-        /** Prevents double stopping of the task. */
-        private final AtomicBoolean taskStopGuard = new AtomicBoolean();
-
-        private BuildIndexTask(
-                TableImpl table,
-                TableIndexView tableIndexView,
-                int partitionId,
-                @Nullable RowId nextRowIdToBuildFromPreviousBatch
-        ) {
-            this.table = table;
-            this.tableIndexView = tableIndexView;
-            this.partitionId = partitionId;
-            this.nextRowIdToBuildFromPreviousBatch = 
nextRowIdToBuildFromPreviousBatch;
-        }
-
-        @Override
-        public void run() {
-            completedFuture(null)
-                    .thenCompose(unused -> inBusyLock(() -> {
-                        // At the time of index creation, table and raft 
services of all partitions should be already started, so there
-                        // should be no errors.
-                        RaftGroupService raftGroupService = 
table.internalTable().partitionRaftGroupService(partitionId);
-
-                        return raftGroupService
-                                // Now we do not check the assignment of a 
node to a partition on purpose, so as not to fight races on the
-                                // rebalance, we just wait for the leader of 
the raft group of the partition, which has had a recovery,
-                                // rebalancing (if needed).
-                                .refreshAndGetLeaderWithTerm()
-                                .thenComposeAsync(leaderWithTerm -> 
inBusyLock(() -> {
-                                    // At this point, we have a stable 
topology, each node of which has already applied all local updates.
-                                    if 
(!localNodeConsistentId().equals(leaderWithTerm.leader().consistentId())) {
-                                        // TODO: IGNITE-19053 Must handle the 
change of leader
-                                        // TODO: IGNITE-19053 Add a test to 
change the leader even at the start of the task
-                                        removeTask();
-
-                                        return completedFuture(null);
-                                    }
-
-                                    RowId nextRowIdToBuild = 
nextRowIdToBuild();
-
-                                    if (nextRowIdToBuild == null) {
-                                        // Index has already been built.
-                                        removeTask();
-
-                                        return completedFuture(null);
-                                    }
-
-                                    List<RowId> batchRowIds = 
collectRowIdBatch(nextRowIdToBuild);
-
-                                    RowId nextRowId = 
getNextRowIdForNextBatch(batchRowIds);
-
-                                    boolean finish = batchRowIds.size() < 
BUILD_INDEX_ROW_ID_BATCH_SIZE || nextRowId == null;
-
-                                    // TODO: IGNITE-19053 Must handle the 
change of leader
-                                    return 
raftGroupService.run(createBuildIndexCommand(batchRowIds, finish))
-                                            .thenRun(() -> inBusyLock(() -> {
-                                                
scheduleNextBuildIndexTaskIfNeeded(nextRowId, finish);
-
-                                                return completedFuture(null);
-                                            }));
-                                }), buildIndexExecutor);
-                    })).whenComplete((unused, throwable) -> {
-                        if (throwable != null) {
-                            removeTask();
-
-                            LOG.error("Index build error: [{}]", throwable, 
createCommonTableIndexInfo());
-                        }
-                    });
-        }
-
-        private void scheduleNextBuildIndexTaskIfNeeded(@Nullable RowId 
nextRowId, boolean finish) {
-            buildIndexTaskById.compute(
-                    new BuildIndexTaskId(table.tableId(), tableIndexView.id(), 
partitionId),
-                    (buildIndexTaskId, previousBuildIndexTask) -> {
-                        if (previousBuildIndexTask == null || finish) {
-                            // Index build task is in the process of stopping 
or has completed.
-                            return null;
-                        }
-
-                        assert nextRowId != null : 
createCommonTableIndexInfo();
-
-                        BuildIndexTask buildIndexTask = new BuildIndexTask(
-                                table,
-                                tableIndexView,
-                                partitionId,
-                                nextRowId
-                        );
-
-                        buildIndexExecutor.submit(buildIndexTask);
-
-                        return buildIndexTask;
-                    });
-        }
-
-        private boolean isLocalNodeLeader(RaftGroupService raftGroupService) {
-            Peer leader = raftGroupService.leader();
-
-            assert leader != null : "tableId=" + table.tableId() + ", 
partitionId=" + partitionId;
-
-            return localNodeConsistentId().equals(leader.consistentId());
-        }
-
-        private List<RowId> createBatchRowIds(RowId lastBuiltRowId, int 
batchSize) {
-            MvPartitionStorage mvPartition = 
table.internalTable().storage().getMvPartition(partitionId);
-
-            assert mvPartition != null : createCommonTableIndexInfo();
-
-            List<RowId> batch = new ArrayList<>(batchSize);
-
-            for (int i = 0; i < batchSize && lastBuiltRowId != null; i++) {
-                lastBuiltRowId = mvPartition.closestRowId(lastBuiltRowId);
-
-                if (lastBuiltRowId == null) {
-                    break;
-                }
-
-                batch.add(lastBuiltRowId);
-
-                lastBuiltRowId = lastBuiltRowId.increment();
-            }
-
-            return batch;
-        }
-
-        private BuildIndexCommand createBuildIndexCommand(List<RowId> rowIds, 
boolean finish) {
-            return TABLE_MESSAGES_FACTORY.buildIndexCommand()
-                    
.tablePartitionId(TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
-                            .tableId(table.tableId())
-                            .partitionId(partitionId)
-                            .build()
-                    )
-                    .indexId(tableIndexView.id())
-                    .rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
-                    .finish(finish)
-                    .build();
-        }
-
-        private String createCommonTableIndexInfo() {
-            return "table=" + table.name() + ", tableId=" + table.tableId()
-                    + ", partitionId=" + partitionId
-                    + ", index=" + tableIndexView.name() + ", indexId=" + 
tableIndexView.id();
-        }
-
-        private String localNodeConsistentId() {
-            return clusterService.topologyService().localMember().name();
-        }
-
-        private @Nullable RowId getNextRowIdForNextBatch(List<RowId> batch) {
-            return batch.isEmpty() ? null : batch.get(batch.size() - 
1).increment();
-        }
-
-        private @Nullable RowId nextRowIdToBuild() {
-            if (nextRowIdToBuildFromPreviousBatch != null) {
-                return nextRowIdToBuildFromPreviousBatch;
-            }
-
-            IndexStorage index = 
table.internalTable().storage().getIndex(partitionId, tableIndexView.id());
-
-            assert index != null : createCommonTableIndexInfo();
-
-            return index.getNextRowIdToBuild();
-        }
-
-        private List<RowId> collectRowIdBatch(RowId nextRowIdToBuild) {
-            if (nextRowIdToBuildFromPreviousBatch == null) {
-                LOG.info("Start building the index: [{}]", 
createCommonTableIndexInfo());
-            }
-
-            return createBatchRowIds(nextRowIdToBuild, 
BUILD_INDEX_ROW_ID_BATCH_SIZE);
-        }
-
-        private boolean enterBusy() {
-            if (!taskBusyLock.enterBusy()) {
-                return false;
-            }
-
-            if (!busyLock.enterBusy()) {
-                taskBusyLock.leaveBusy();
-
-                return false;
-            }
-
-            return true;
-        }
-
-        private void leaveBusy() {
-            taskBusyLock.leaveBusy();
-            busyLock.leaveBusy();
-        }
-
-        private <V> CompletableFuture<V> 
inBusyLock(Supplier<CompletableFuture<V>> supplier) {
-            if (!enterBusy()) {
-                removeTask();
-
-                return completedFuture(null);
-            }
-
-            try {
-                return supplier.get();
-            } finally {
-                leaveBusy();
-            }
-        }
-
-        private void stop() {
-            if (!taskStopGuard.compareAndSet(false, true)) {
-                return;
-            }
-
-            taskBusyLock.block();
-        }
-
-        private void removeTask() {
-            buildIndexTaskById.remove(new BuildIndexTaskId(table.tableId(), 
tableIndexView.id(), partitionId));
-        }
-    }
-}
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index f6182ee465..f17c37faa2 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.index;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.storage.index.IndexDescriptor.createIndexDescriptor;
 import static org.apache.ignite.internal.util.ArrayUtils.STRING_EMPTY_ARRAY;
 
 import java.util.ArrayList;
@@ -70,7 +71,6 @@ import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableNotFoundException;
-import org.apache.ignite.network.ClusterService;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -96,29 +96,21 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
     /** Prevents double stopping of the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
-    /** Index builder. */
-    private final IndexBuilder indexBuilder;
-
     /**
      * Constructor.
      *
-     * @param nodeName Node name.
      * @param tablesCfg Tables and indexes configuration.
      * @param schemaManager Schema manager.
      * @param tableManager Table manager.
-     * @param clusterService Cluster service.
      */
     public IndexManager(
-            String nodeName,
             TablesConfiguration tablesCfg,
             SchemaManager schemaManager,
-            TableManager tableManager,
-            ClusterService clusterService
+            TableManager tableManager
     ) {
         this.tablesCfg = Objects.requireNonNull(tablesCfg, "tablesCfg");
         this.schemaManager = Objects.requireNonNull(schemaManager, 
"schemaManager");
         this.tableManager = tableManager;
-        this.indexBuilder = new IndexBuilder(nodeName, busyLock, 
clusterService);
     }
 
     /** {@inheritDoc} */
@@ -175,8 +167,6 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
 
         busyLock.block();
 
-        indexBuilder.stop();
-
         LOG.info("Index manager stopped");
     }
 
@@ -400,8 +390,6 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
             CompletableFuture<?> dropIndexFuture = 
tableManager.tableAsync(causalityToken, tableId)
                     .thenAccept(table -> {
                         if (table != null) { // in case of DROP TABLE the 
table will be removed first
-                            indexBuilder.stopIndexBuild(tableIndexView, table);
-
                             table.unregisterIndex(idxId);
                         }
                     });
@@ -454,7 +442,7 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
 
         IndexDescriptor descriptor = newDescriptor(tableIndexView);
 
-        org.apache.ignite.internal.storage.index.IndexDescriptor 
storageIndexDescriptor = createStorageIndexDescriptor(tablesView, indexId);
+        org.apache.ignite.internal.storage.index.IndexDescriptor 
storageIndexDescriptor = createIndexDescriptor(tablesView, indexId);
 
         CompletableFuture<?> fireEventFuture =
                 fireEvent(IndexEvent.CREATE, new 
IndexEventParameters(causalityToken, tableId, indexId, descriptor));
@@ -485,8 +473,6 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
                     table.pkId(indexId);
                 }
             }
-
-            indexBuilder.startIndexBuild(tableIndexView, table, 
storageIndexDescriptor);
         });
 
         return allOf(createIndexFuture, fireEventFuture);
@@ -641,16 +627,4 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
             return failedFuture(new IllegalStateException("Should not be 
called"));
         }
     }
-
-    private org.apache.ignite.internal.storage.index.IndexDescriptor 
createStorageIndexDescriptor(TablesView tablesView, UUID indexId) {
-        TableIndexView indexView = tablesView.indexes().get(indexId);
-
-        if (indexView instanceof HashIndexView) {
-            return new HashIndexDescriptor(indexId, tablesView);
-        } else if (indexView instanceof SortedIndexView) {
-            return new 
org.apache.ignite.internal.storage.index.SortedIndexDescriptor(indexId, 
tablesView);
-        }
-
-        throw new AssertionError("Unknown index type [type=" + (indexView != 
null ? indexView.getClass() : null) + ']');
-    }
 }
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index b01675ec75..06564a898c 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -54,7 +54,6 @@ import 
org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IndexNotFoundException;
-import org.apache.ignite.network.ClusterService;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -93,8 +92,7 @@ public class IndexManagerTest {
 
         when(schManager.schemaRegistry(anyLong(), 
any())).thenReturn(completedFuture(null));
 
-        indexManager = new IndexManager("test", tablesConfig, schManager, 
tableManagerMock,
-                mock(ClusterService.class));
+        indexManager = new IndexManager(tablesConfig, schManager, 
tableManagerMock);
         indexManager.start();
 
         assertThat(
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index a7f0edc913..0bd0cb34e7 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -145,6 +145,8 @@ public class Replica {
         if (!leaderFuture.isDone()) {
             leaderFuture.complete(leaderRef);
         }
+
+        listener.onBecomePrimary(clusterNode);
     }
 
     private CompletableFuture<ClusterNode> leaderFuture() {
@@ -267,4 +269,11 @@ public class Replica {
 
         return leased != null ? leased.consistentId() : localNode.name();
     }
+
+    /**
+     * Shutdowns the replica.
+     */
+    public void shutdown() {
+        listener.onShutdown();
+    }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index fafe11e154..9582756359 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -382,8 +382,21 @@ public class ReplicaManager implements IgniteComponent {
      * @param replicaGrpId Replication group id.
      * @return True if the replica is found and closed, false otherwise.
      */
+    // TODO: IGNITE-19494 We need to correctly stop the replica
     private boolean stopReplicaInternal(ReplicationGroupId replicaGrpId) {
-        return replicas.remove(replicaGrpId) != null;
+        CompletableFuture<Replica> removed = replicas.remove(replicaGrpId);
+
+        if (removed != null) {
+            removed.whenComplete((replica, throwable) -> {
+                if (throwable != null) {
+                    replica.shutdown();
+                }
+            });
+
+            return true;
+        }
+
+        return false;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
index c06515a943..f98b8cf900 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.replicator.listener;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.network.ClusterNode;
 
 /**
  * Replica listener.
  */
+@FunctionalInterface
 public interface ReplicaListener {
     /**
      * Invokes a replica listener to process request.
@@ -31,4 +33,20 @@ public interface ReplicaListener {
      * @return Listener response.
      */
     CompletableFuture<?> invoke(ReplicaRequest request);
+
+    /**
+     * Callback on becoming the primary replica.
+     *
+     * @param clusterNode Primary replica node.
+     */
+    default void onBecomePrimary(ClusterNode clusterNode) {
+        // No-op.
+    }
+
+    /**
+     * Callback on replica shutdown.
+     */
+    default void onShutdown() {
+        // No-op.
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a21247c5ac..3d2561e0c3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -406,7 +406,7 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
                 null
         );
 
-        var indexManager = new IndexManager(name, tablesConfiguration, 
schemaManager, tableManager, clusterSvc);
+        var indexManager = new IndexManager(tablesConfiguration, 
schemaManager, tableManager);
 
         SqlQueryProcessor qryEngine = new SqlQueryProcessor(
                 registry,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index f0c2dbba6d..7f641fdd75 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine;
 
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
+import static 
org.apache.ignite.internal.storage.index.IndexDescriptor.createIndexDescriptor;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -30,13 +31,14 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
@@ -52,18 +54,12 @@ import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesView;
-import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
-import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
 import 
org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker;
 import org.apache.ignite.internal.sql.engine.util.TestQueryProcessor;
-import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
-import org.apache.ignite.internal.storage.index.IndexDescriptor;
 import org.apache.ignite.internal.storage.index.IndexStorage;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -487,9 +483,12 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
      *
      * @param tableName Table name.
      * @param indexName Index name.
+     * @return Nodes on which the partition index was built.
      * @throws Exception If failed.
      */
-    public static void waitForIndexBuild(String tableName, String indexName) 
throws Exception {
+    protected static Map<Integer, List<Ignite>> waitForIndexBuild(String 
tableName, String indexName) throws Exception {
+        Map<Integer, List<Ignite>> partitionIdToNodes = new HashMap<>();
+
         // TODO: IGNITE-18733 We are waiting for the synchronization of schemes
         for (Ignite clusterNode : CLUSTER_NODES) {
             CompletableFuture<Table> tableFuture = 
clusterNode.tables().tableAsync(tableName);
@@ -515,14 +514,19 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
                     continue;
                 }
 
-                IndexStorage index = internalTable.storage().getOrCreateIndex(
-                        partitionId,
-                        
createIndexDescription(getTablesConfiguration(clusterNode).value(), indexName)
-                );
+                TablesView tablesView = 
getTablesConfiguration(clusterNode).value();
+
+                UUID indexId = 
tablesView.indexes().get(indexName.toUpperCase()).id();
+
+                IndexStorage index = 
internalTable.storage().getOrCreateIndex(partitionId, 
createIndexDescriptor(tablesView, indexId));
 
                 assertTrue(waitForCondition(() -> index.getNextRowIdToBuild() 
== null, 10, TimeUnit.SECONDS.toMillis(10)));
+
+                partitionIdToNodes.computeIfAbsent(partitionId, p -> new 
ArrayList<>()).add(clusterNode);
             }
         }
+
+        return partitionIdToNodes;
     }
 
     /**
@@ -533,18 +537,4 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
     public static TablesConfiguration getTablesConfiguration(Ignite node) {
         return ((IgniteImpl) 
node).clusterConfiguration().getConfiguration(TablesConfiguration.KEY);
     }
-
-    private static IndexDescriptor createIndexDescription(TablesView 
tablesView, String indexName) {
-        TableIndexView indexView = 
tablesView.indexes().get(indexName.toUpperCase());
-
-        assertNotNull(indexView, indexName);
-
-        if (indexView instanceof HashIndexView) {
-            return new HashIndexDescriptor(indexView.id(), tablesView);
-        } else if (indexView instanceof SortedIndexView) {
-            return new SortedIndexDescriptor(indexView.id(), tablesView);
-        }
-
-        return fail(indexView.getClass().getName());
-    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
index 84cdc9ef7d..e0f2e4ea32 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
@@ -19,9 +19,13 @@ package org.apache.ignite.internal.sql.engine;
 
 import static java.util.stream.Collectors.joining;
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -41,13 +45,16 @@ public class ItBuildIndexTest extends 
ClusterPerClassIntegrationTest {
     @AfterEach
     void tearDown() {
         sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+        sql("DROP ZONE IF EXISTS " + ZONE_NAME);
     }
 
     @ParameterizedTest(name = "replicas : {0}")
     @MethodSource("replicas")
     void testBuildIndexOnStableTopology(int replicas) throws Exception {
+        int partitions = 2;
+
         sql(IgniteStringFormatter.format("CREATE ZONE IF NOT EXISTS {} WITH 
REPLICAS={}, PARTITIONS={}",
-                ZONE_NAME, replicas, 2
+                ZONE_NAME, replicas, partitions
         ));
 
         sql(IgniteStringFormatter.format(
@@ -63,7 +70,18 @@ public class ItBuildIndexTest extends 
ClusterPerClassIntegrationTest {
         sql(IgniteStringFormatter.format("CREATE INDEX {} ON {} (i1)", 
INDEX_NAME, TABLE_NAME));
 
         // TODO: IGNITE-19150 We are waiting for schema synchronization to 
avoid races to create and destroy indexes
-        waitForIndexBuild(TABLE_NAME, INDEX_NAME);
+        Map<Integer, List<Ignite>> nodesWithBuiltIndexesByPartitionId = 
waitForIndexBuild(TABLE_NAME, INDEX_NAME);
+
+        // Check that the number of nodes with built indexes is equal to the 
number of replicas.
+        assertEquals(partitions, nodesWithBuiltIndexesByPartitionId.size());
+
+        for (Entry<Integer, List<Ignite>> entry : 
nodesWithBuiltIndexesByPartitionId.entrySet()) {
+            assertEquals(
+                    replicas,
+                    entry.getValue().size(),
+                    IgniteStringFormatter.format("p={}, nodes={}", 
entry.getKey(), entry.getValue())
+            );
+        }
 
         assertQuery(IgniteStringFormatter.format("SELECT * FROM {} WHERE i1 > 
0", TABLE_NAME))
                 .matches(containsIndexScan("PUBLIC", TABLE_NAME.toUpperCase(), 
INDEX_NAME.toUpperCase()))
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ad1acdee1a..cabe96008c 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -512,7 +512,7 @@ public class IgniteImpl implements Ignite {
                 distributionZoneManager
         );
 
-        indexManager = new IndexManager(name, tablesConfiguration, 
schemaManager, distributedTblMgr, clusterSvc);
+        indexManager = new IndexManager(tablesConfiguration, schemaManager, 
distributedTblMgr);
 
         qryEngine = new SqlQueryProcessor(
                 registry,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 69335f7410..ff5ae6937b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -290,7 +290,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         tblManager = mockManagers();
 
-        idxManager = new IndexManager(NODE_NAME, tblsCfg, schemaManager, 
tblManager, cs);
+        idxManager = new IndexManager(tblsCfg, schemaManager, tblManager);
 
         idxManager.start();
 
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
index 5dd2fb9465..d3a0743f34 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
@@ -20,6 +20,10 @@ package org.apache.ignite.internal.storage.index;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.configuration.TablesView;
+import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
+import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 
 /**
  * Index descriptor.
@@ -54,4 +58,22 @@ public interface IndexDescriptor {
      * Returns index column descriptions.
      */
     List<? extends ColumnDescriptor> columns();
+
+    /**
+     * Creates an index description based on the configuration.
+     *
+     * @param tablesView Tables configuration.
+     * @param indexId Index ID.
+     */
+    static IndexDescriptor createIndexDescriptor(TablesView tablesView, UUID 
indexId) {
+        TableIndexView indexView = tablesView.indexes().get(indexId);
+
+        if (indexView instanceof HashIndexView) {
+            return new HashIndexDescriptor(indexId, tablesView);
+        } else if (indexView instanceof SortedIndexView) {
+            return new SortedIndexDescriptor(indexId, tablesView);
+        } else {
+            throw new AssertionError("Unknown type: " + indexView);
+        }
+    }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index b1b0ac602f..3f2e74c0ff 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -76,10 +77,12 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TxAbstractTest;
@@ -89,6 +92,7 @@ import 
org.apache.ignite.internal.table.distributed.LowWatermark;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
@@ -140,6 +144,12 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
     @InjectConfiguration
     private static DataStorageConfiguration dsCfg;
 
+    @InjectConfiguration("mock.tables.foo {}")
+    private static TablesConfiguration tablesConfig;
+
+    @InjectConfiguration
+    private static DistributionZoneConfiguration distributionZoneConfig;
+
     private static final IgniteLogger LOG = 
Loggers.forClass(ItTxDistributedTestSingleNode.class);
 
     public static final int NODE_PORT_BASE = 20_000;
@@ -423,7 +433,8 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
             TablePartitionId grpId = grpIds.get(p);
 
             for (String assignment : partAssignments) {
-                var testMpPartStorage = new TestMvPartitionStorage(0);
+                var mvTableStorage = new 
TestMvTableStorage(tablesConfig.tables().get("foo"), tablesConfig, 
distributionZoneConfig);
+                var mvPartStorage = new TestMvPartitionStorage(0);
                 var txStateStorage = txStateStorages.get(assignment);
                 var placementDriver = new 
PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
 
@@ -451,7 +462,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                         new 
PendingComparableValuesTracker<>(clocks.get(assignment).now());
                 PendingComparableValuesTracker<Long, Void> storageIndexTracker 
= new PendingComparableValuesTracker<>(0L);
 
-                PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(testMpPartStorage);
+                PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartStorage);
 
                 StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(
                         partId,
@@ -487,9 +498,9 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                                 DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schemaDescriptor);
                                 replicaManagers.get(assignment).startReplica(
                                         new TablePartitionId(tblId, partId),
-                                        
CompletableFuture.completedFuture(null),
+                                        completedFuture(null),
                                         new PartitionReplicaListener(
-                                                testMpPartStorage,
+                                                mvPartStorage,
                                                 raftSvc,
                                                 txManagers.get(assignment),
                                                 
txManagers.get(assignment).lockManager(),
@@ -498,15 +509,17 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                                                 tblId,
                                                 () -> Map.of(pkLocker.id(), 
pkLocker),
                                                 pkStorage,
-                                                () -> Map.of(),
+                                                Map::of,
                                                 clocks.get(assignment),
                                                 safeTime,
                                                 txStateStorage,
                                                 placementDriver,
                                                 storageUpdateHandler,
                                                 new 
DummySchemas(schemaManager),
-                                                peer -> 
assignment.equals(peer.consistentId()),
-                                                completedFuture(schemaManager)
+                                                completedFuture(schemaManager),
+                                                
consistentIdToNode.apply(assignment),
+                                                mvTableStorage,
+                                                mock(IndexBuilder.class)
                                         ),
                                         raftSvc,
                                         storageIndexTracker
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index fef8bbc481..f76d961aa4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -417,4 +417,12 @@ public class StorageUpdateHandler {
     public PendingComparableValuesTracker<HybridTimestamp, Void> 
getSafeTimeTracker() {
         return safeTimeTracker;
     }
+
+    /**
+     * Waits for indexes to be created.
+     */
+    // TODO: IGNITE-19513 Fix it, we should have already waited for the 
indexes to be created
+    public void waitIndexes() {
+        indexes.get();
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index f9d97ed766..d35b8da1eb 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -132,6 +132,7 @@ import 
org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
 import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -322,6 +323,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
     private final LowWatermark lowWatermark;
 
+    private final IndexBuilder indexBuilder;
+
     /**
      * Creates a new table manager.
      *
@@ -432,6 +435,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         mvGc = new MvGc(nodeName, tablesCfg);
 
         lowWatermark = new LowWatermark(nodeName, tablesCfg.lowWatermark(), 
clock, txManager, vaultManager, mvGc);
+
+        indexBuilder = new IndexBuilder(nodeName, cpus);
     }
 
     @Override
@@ -635,7 +640,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             futures[i] = new CompletableFuture<>();
         }
 
-        String localMemberName = 
clusterService.topologyService().localMember().name();
+        String localMemberName = localNode().name();
 
         // Create new raft nodes according to new assignments.
         return tablesByIdVv.update(causalityToken, (tablesById, e) -> 
inBusyLock(busyLock, () -> {
@@ -829,8 +834,10 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                             placementDriver,
                                                             
storageUpdateHandler,
                                                             new 
NonHistoricSchemas(schemaManager),
-                                                            this::isLocalPeer,
-                                                            
schemaManager.schemaRegistry(causalityToken, tblId)
+                                                            
schemaManager.schemaRegistry(causalityToken, tblId),
+                                                            localNode(),
+                                                            
table.internalTable().storage(),
+                                                            indexBuilder
                                                     ),
                                                     updatedRaftGroupService,
                                                     storageIndexTracker
@@ -854,7 +861,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     }
 
     private boolean isLocalPeer(Peer peer) {
-        return 
peer.consistentId().equals(clusterService.topologyService().localMember().name());
+        return peer.consistentId().equals(localNode().name());
     }
 
     private PartitionDataStorage partitionDataStorage(MvPartitionStorage 
partitionStorage, InternalTable internalTbl, int partId) {
@@ -950,7 +957,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         cleanUpTablesResources(tablesToStop);
 
         try {
-            IgniteUtils.closeAllManually(lowWatermark, mvGc);
+            IgniteUtils.closeAllManually(lowWatermark, mvGc, indexBuilder);
         } catch (Throwable t) {
             LOG.error("Failed to close internal components", t);
         }
@@ -2000,7 +2007,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 
stableConfiguration.peers().stream().map(Peer::consistentId).collect(toList())
         );
 
-        ClusterNode localMember = 
clusterService.topologyService().localMember();
+        ClusterNode localMember = localNode();
 
         // Start a new Raft node and Replica if this node has appeared in the 
new assignments.
         boolean shouldStartLocalServices = pendingAssignments.stream()
@@ -2102,8 +2109,10 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                             placementDriver,
                                             storageUpdateHandler,
                                             new 
NonHistoricSchemas(schemaManager),
-                                            this::isLocalPeer,
-                                            
completedFuture(schemaManager.schemaRegistry(tblId))
+                                            
completedFuture(schemaManager.schemaRegistry(tblId)),
+                                            localNode(),
+                                            internalTable.storage(),
+                                            indexBuilder
                                     ),
                                     (TopologyAwareRaftGroupService) 
internalTable.partitionRaftGroupService(partId),
                                     storageIndexTracker
@@ -2316,7 +2325,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                             ? Set.of()
                             : 
ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage);
 
-                    String localMemberName = 
clusterService.topologyService().localMember().name();
+                    String localMemberName = localNode().name();
 
                     boolean shouldStopLocalServices = 
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
                             .noneMatch(assignment -> 
assignment.consistentId().equals(localMemberName));
@@ -2396,4 +2405,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             tracker.close();
         }
     }
+
+    private ClusterNode localNode() {
+        return clusterService.topologyService().localMember();
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
new file mode 100644
index 0000000000..4385aace8d
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.index;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
+
+/**
+ * Task of building a table index.
+ */
+class IndexBuildTask {
+    private static final IgniteLogger LOG = 
Loggers.forClass(IndexBuildTask.class);
+
+    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new 
TableMessagesFactory();
+
+    private final IndexBuildTaskId taskId;
+
+    private final IndexStorage indexStorage;
+
+    private final MvPartitionStorage partitionStorage;
+
+    private final RaftGroupService raftClient;
+
+    private final ExecutorService executor;
+
+    private final IgniteSpinBusyLock busyLock;
+
+    private final int batchSize;
+
+    private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean taskStopGuard = new AtomicBoolean();
+
+    private final CompletableFuture<Void> taskFuture = new 
CompletableFuture<>();
+
+    IndexBuildTask(
+            IndexBuildTaskId taskId,
+            IndexStorage indexStorage,
+            MvPartitionStorage partitionStorage,
+            RaftGroupService raftClient,
+            ExecutorService executor,
+            IgniteSpinBusyLock busyLock,
+            int batchSize
+    ) {
+        this.taskId = taskId;
+        this.indexStorage = indexStorage;
+        this.partitionStorage = partitionStorage;
+        this.raftClient = raftClient;
+        this.executor = executor;
+        this.busyLock = busyLock;
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Starts building the index.
+     */
+    void start() {
+        if (!enterBusy()) {
+            taskFuture.complete(null);
+
+            return;
+        }
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Start building the index: [{}]", 
createCommonIndexInfo());
+        }
+
+        try {
+            supplyAsync(this::handleNextBatch, executor)
+                    .thenCompose(Function.identity())
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null) {
+                            LOG.error("Index build error: [{}]", throwable, 
createCommonIndexInfo());
+
+                            taskFuture.completeExceptionally(throwable);
+                        } else {
+                            taskFuture.complete(null);
+                        }
+                    });
+        } catch (Throwable t) {
+            taskFuture.completeExceptionally(t);
+
+            throw t;
+        } finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Stops index building.
+     */
+    void stop() {
+        if (!taskStopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        taskBusyLock.block();
+    }
+
+    /**
+     * Returns the index build future.
+     */
+    CompletableFuture<Void> getTaskFuture() {
+        return taskFuture;
+    }
+
+    private CompletableFuture<Void> handleNextBatch() {
+        if (!enterBusy()) {
+            return completedFuture(null);
+        }
+
+        try {
+            List<RowId> batchRowIds = createBatchRowIds();
+
+            return raftClient.run(createBuildIndexCommand(batchRowIds))
+                    .thenComposeAsync(unused -> {
+                        if (indexStorage.getNextRowIdToBuild() == null) {
+                            // Index has been built.
+                            return completedFuture(null);
+                        }
+
+                        return handleNextBatch();
+                    }, executor);
+        } catch (Throwable t) {
+            return failedFuture(t);
+        } finally {
+            leaveBusy();
+        }
+    }
+
+    private List<RowId> createBatchRowIds() {
+        RowId nextRowIdToBuild = indexStorage.getNextRowIdToBuild();
+
+        List<RowId> batch = new ArrayList<>(batchSize);
+
+        for (int i = 0; i < batchSize && nextRowIdToBuild != null; i++) {
+            nextRowIdToBuild = partitionStorage.closestRowId(nextRowIdToBuild);
+
+            if (nextRowIdToBuild == null) {
+                break;
+            }
+
+            batch.add(nextRowIdToBuild);
+
+            nextRowIdToBuild = nextRowIdToBuild.increment();
+        }
+
+        return batch;
+    }
+
+    private BuildIndexCommand createBuildIndexCommand(List<RowId> rowIds) {
+        boolean finish = rowIds.size() < batchSize;
+
+        return TABLE_MESSAGES_FACTORY.buildIndexCommand()
+                
.tablePartitionId(TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
+                        .tableId(taskId.getTableId())
+                        .partitionId(taskId.getPartitionId())
+                        .build()
+                )
+                .indexId(taskId.getIndexId())
+                .rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
+                .finish(finish)
+                .build();
+    }
+
+    private boolean enterBusy() {
+        if (!busyLock.enterBusy()) {
+            return false;
+        }
+
+        if (!taskBusyLock.enterBusy()) {
+            busyLock.leaveBusy();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    private void leaveBusy() {
+        taskBusyLock.leaveBusy();
+        busyLock.leaveBusy();
+    }
+
+    private String createCommonIndexInfo() {
+        return IgniteStringFormatter.format(
+                "tableId={}, partitionId={}, indexId={}",
+                taskId.getTableId(), taskId.getPartitionId(), 
taskId.getIndexId()
+        );
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTaskId.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTaskId.java
new file mode 100644
index 0000000000..ca55ed314c
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTaskId.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.index;
+
+import java.util.UUID;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * {@link IndexBuildTask} ID.
+ */
+class IndexBuildTaskId {
+    private final UUID tableId;
+
+    private final int partitionId;
+
+    private final UUID indexId;
+
+    IndexBuildTaskId(UUID tableId, int partitionId, UUID indexId) {
+        this.tableId = tableId;
+        this.partitionId = partitionId;
+        this.indexId = indexId;
+    }
+
+    public UUID getTableId() {
+        return tableId;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public UUID getIndexId() {
+        return indexId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        IndexBuildTaskId that = (IndexBuildTaskId) o;
+
+        return partitionId == that.partitionId && tableId.equals(that.tableId) 
&& indexId.equals(that.indexId);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = tableId.hashCode();
+        result = 31 * result + partitionId;
+        result = 31 * result + indexId.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(IndexBuildTaskId.class, this);
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
new file mode 100644
index 0000000000..be19759424
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.index;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Class for managing the building of table indexes.
+ */
+public class IndexBuilder implements ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(IndexBuilder.class);
+
+    private static final int BATCH_SIZE = 100;
+
+    private final ExecutorService executor;
+
+    private final Map<IndexBuildTaskId, IndexBuildTask> indexBuildTaskById = 
new ConcurrentHashMap<>();
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param threadCount Number of threads to build indexes.
+     */
+    public IndexBuilder(String nodeName, int threadCount) {
+        executor = new ThreadPoolExecutor(
+                threadCount,
+                threadCount,
+                30,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                NamedThreadFactory.create(nodeName, "build-index", LOG)
+        );
+    }
+
+    /**
+     * Starts building the index if it is not already built or is not yet in 
progress.
+     *
+     * <p>Index is built in batches using {@link BuildIndexCommand} (via 
raft), batches are sent sequentially.
+     *
+     * <p>It is expected that the index building is triggered by the leader of 
the raft group.
+     *
+     * @param tableId Table ID.
+     * @param partitionId Partition ID.
+     * @param indexId Index ID.
+     * @param indexStorage Index storage to build.
+     * @param partitionStorage Multi-versioned partition storage.
+     * @param raftClient Raft client.
+     */
+    // TODO: IGNITE-19498 Perhaps we need to start building the index only once
+    public void startBuildIndex(
+            UUID tableId,
+            int partitionId,
+            UUID indexId,
+            IndexStorage indexStorage,
+            MvPartitionStorage partitionStorage,
+            RaftGroupService raftClient
+    ) {
+        inBusyLock(() -> {
+            if (indexStorage.getNextRowIdToBuild() == null) {
+                return;
+            }
+
+            IndexBuildTaskId taskId = new IndexBuildTaskId(tableId, 
partitionId, indexId);
+
+            IndexBuildTask newTask = new IndexBuildTask(taskId, indexStorage, 
partitionStorage, raftClient, executor, busyLock, BATCH_SIZE);
+
+            IndexBuildTask previousTask = 
indexBuildTaskById.putIfAbsent(taskId, newTask);
+
+            if (previousTask != null) {
+                // Index building is already in progress.
+                return;
+            }
+
+            newTask.start();
+
+            newTask.getTaskFuture().whenComplete((unused, throwable) -> 
indexBuildTaskById.remove(taskId));
+        });
+    }
+
+    /**
+     * Stops index building if it is in progress.
+     *
+     * @param tableId Table ID.
+     * @param partitionId Partition ID.
+     * @param indexId Index ID.
+     */
+    public void stopBuildIndex(UUID tableId, int partitionId, UUID indexId) {
+        inBusyLock(() -> {
+            IndexBuildTask removed = indexBuildTaskById.remove(new 
IndexBuildTaskId(tableId, partitionId, indexId));
+
+            if (removed != null) {
+                removed.stop();
+            }
+        });
+    }
+
+    /**
+     * Stops building all indexes (for a table partition) if they are in 
progress.
+     *
+     * @param tableId Table ID.
+     * @param partitionId Partition ID.
+     */
+    public void stopBuildIndexes(UUID tableId, int partitionId) {
+        for (Iterator<Entry<IndexBuildTaskId, IndexBuildTask>> it = 
indexBuildTaskById.entrySet().iterator(); it.hasNext(); ) {
+            if (!busyLock.enterBusy()) {
+                return;
+            }
+
+            try {
+                Entry<IndexBuildTaskId, IndexBuildTask> entry = it.next();
+
+                IndexBuildTaskId taskId = entry.getKey();
+
+                if (tableId.equals(taskId.getTableId()) && partitionId == 
taskId.getPartitionId()) {
+                    it.remove();
+
+                    entry.getValue().stop();
+                }
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
+    }
+
+    private void inBusyLock(Runnable runnable) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            runnable.run();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index b170f5efe0..7ff5846779 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -192,6 +192,12 @@ public class PartitionAccessImpl implements 
PartitionAccess {
 
     @Override
     public CompletableFuture<Void> startRebalance() {
+        // Avoids a race between creating indexes and starting a rebalance.
+        // If an index appears after the rebalance has started, then at the 
end of the rebalance it will have a status of RUNNABLE instead
+        // of REBALANCE which will lead to errors.
+        // TODO: IGNITE-19513 Fix it, we should have already waited for the 
indexes to be created
+        storageUpdateHandler.waitIndexes();
+
         TxStateStorage txStateStorage = getTxStateStorage(partitionId());
 
         return mvGc.removeStorage(toTablePartitionId(partitionKey))
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 725f19008a..eac16412b8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.storage.index.IndexDescriptor.createIndexDescriptor;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.IgniteUtils.filter;
 import static org.apache.ignite.internal.util.IgniteUtils.findAny;
@@ -46,12 +47,15 @@ import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -73,11 +77,15 @@ import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.configuration.TablesView;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.IndexDescriptor;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.storage.index.IndexStorage;
@@ -93,6 +101,7 @@ import 
org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import 
org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
@@ -120,12 +129,14 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.CursorUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 
@@ -143,18 +154,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Replication group id. */
     private final TablePartitionId replicationGroupId;
 
-    /** Partition id. */
-    private final int partId;
-
     /** Primary key index. */
     private final Lazy<TableSchemaAwareIndexStorage> pkIndexStorage;
 
     /** Secondary indices. */
     private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> 
secondaryIndexStorages;
 
-    /** Table id. */
-    private final UUID tableId;
-
     /** Versioned partition storage. */
     private final MvPartitionStorage mvDataStorage;
 
@@ -199,17 +204,30 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private final Supplier<Map<UUID, IndexLocker>> indexesLockers;
 
-    /**
-     * Function for checking that the given peer is local.
-     */
-    private final Function<Peer, Boolean> isLocalPeerChecker;
-
     private final ConcurrentMap<UUID, TxCleanupReadyFutureList> 
txCleanupReadyFutures = new ConcurrentHashMap<>();
 
     private final CompletableFuture<SchemaRegistry> schemaFut;
 
     private final SchemaCompatValidator schemaCompatValidator;
 
+    /** Instance of the local node. */
+    private final ClusterNode localNode;
+
+    /** Table storage. */
+    private final MvTableStorage mvTableStorage;
+
+    /** Index builder. */
+    private final IndexBuilder indexBuilder;
+
+    /** Listener for configuration indexes, {@code null} if the replica is not 
the leader. */
+    private final 
AtomicReference<ConfigurationNamedListListener<TableIndexView>> 
indexesConfigurationListener = new AtomicReference<>();
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
     /**
      * The constructor.
      *
@@ -227,8 +245,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param txStateStorage Transaction state storage.
      * @param placementDriver Placement driver.
      * @param storageUpdateHandler Handler that processes updates writing them 
to storage.
-     * @param isLocalPeerChecker Function for checking that the given peer is 
local.
      * @param schemaFut Table schema.
+     * @param localNode Instance of the local node.
+     * @param mvTableStorage Table storage.
+     * @param indexBuilder Index builder.
      */
     public PartitionReplicaListener(
             MvPartitionStorage mvDataStorage,
@@ -247,16 +267,16 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             PlacementDriver placementDriver,
             StorageUpdateHandler storageUpdateHandler,
             Schemas schemas,
-            Function<Peer, Boolean> isLocalPeerChecker,
-            CompletableFuture<SchemaRegistry> schemaFut
+            CompletableFuture<SchemaRegistry> schemaFut,
+            ClusterNode localNode,
+            MvTableStorage mvTableStorage,
+            IndexBuilder indexBuilder
     ) {
         this.mvDataStorage = mvDataStorage;
         this.raftClient = raftClient;
         this.txManager = txManager;
         this.lockManager = lockManager;
         this.scanRequestExecutor = scanRequestExecutor;
-        this.partId = partId;
-        this.tableId = tableId;
         this.indexesLockers = indexesLockers;
         this.pkIndexStorage = pkIndexStorage;
         this.secondaryIndexStorages = secondaryIndexStorages;
@@ -264,9 +284,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         this.safeTime = safeTime;
         this.txStateStorage = txStateStorage;
         this.placementDriver = placementDriver;
-        this.isLocalPeerChecker = isLocalPeerChecker;
         this.storageUpdateHandler = storageUpdateHandler;
         this.schemaFut = schemaFut;
+        this.localNode = localNode;
+        this.mvTableStorage = mvTableStorage;
+        this.indexBuilder = indexBuilder;
 
         this.replicationGroupId = new TablePartitionId(tableId, partId);
 
@@ -333,7 +355,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .thenCompose(replicaAndTerm -> {
                     Peer leader = replicaAndTerm.leader();
 
-                    if (isLocalPeerChecker.apply(leader)) {
+                    if (isLocalPeer(leader)) {
                         CompletableFuture<TxMeta> txStateFut = 
getTxStateConcurrently(request);
 
                         return txStateFut.thenApply(txMeta -> new 
LeaderOrTxState(null, txMeta));
@@ -652,7 +674,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
 
-        return lockManager.acquire(txId, new LockKey(tableId), 
LockMode.S).thenCompose(tblLock -> {
+        return lockManager.acquire(txId, new LockKey(tableId()), 
LockMode.S).thenCompose(tblLock -> {
             var batchRows = new ArrayList<BinaryRow>(batchCount);
 
             @SuppressWarnings("resource") PartitionTimestampCursor cursor = 
(PartitionTimestampCursor) cursors.computeIfAbsent(cursorId,
@@ -727,7 +749,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         BinaryTuple exactKey = request.exactKey();
 
         return lockManager.acquire(txId, new LockKey(indexId), 
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
-            return lockManager.acquire(txId, new LockKey(tableId), 
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
+            return lockManager.acquire(txId, new LockKey(tableId()), 
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
                 return lockManager.acquire(txId, new LockKey(indexId, 
exactKey.byteBuffer()), LockMode.S)
                         .thenCompose(indRowLock -> { // Hash index bucket S 
lock
                             Cursor<RowId> cursor = (Cursor<RowId>) 
cursors.computeIfAbsent(cursorId, id -> indexStorage.get(exactKey));
@@ -767,7 +789,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         int flags = request.flags();
 
         return lockManager.acquire(txId, new LockKey(indexId), 
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
-            return lockManager.acquire(txId, new LockKey(tableId), 
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
+            return lockManager.acquire(txId, new LockKey(tableId()), 
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
                 var comparator = new 
BinaryTupleComparator(indexStorage.indexDescriptor());
 
                 Predicate<IndexRow> isUpperBoundAchieved = indexRow -> {
@@ -916,7 +938,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(null); // End of range reached. 
Exit loop.
                     }
 
-                    return lockManager.acquire(txId, new LockKey(tableId, 
currentRow.rowId()), LockMode.S)
+                    return lockManager.acquire(txId, new LockKey(tableId(), 
currentRow.rowId()), LockMode.S)
                             .thenComposeAsync(rowLock -> { // Table row S lock
                                 ReadResult readResult = 
mvDataStorage.read(currentRow.rowId(), HybridTimestamp.MAX_VALUE);
                                 return 
resolveAndCheckReadCompatibility(readResult, txId)
@@ -968,7 +990,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         RowId rowId = indexCursor.next();
 
-        return lockManager.acquire(txId, new LockKey(tableId, rowId), 
LockMode.S)
+        return lockManager.acquire(txId, new LockKey(tableId(), rowId), 
LockMode.S)
                 .thenComposeAsync(rowLock -> { // Table row S lock
                     ReadResult readResult = mvDataStorage.read(rowId, 
HybridTimestamp.MAX_VALUE);
                     return resolveAndCheckReadCompatibility(readResult, txId)
@@ -1390,7 +1412,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             }
         } catch (Exception e) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
-                    format("Unable to close cursor [tableId={}]", tableId), e);
+                    format("Unable to close cursor [tableId={}]", tableId()), 
e);
         }
     }
 
@@ -1572,7 +1594,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             BinaryTuple keyTuple = pkTupleFuts[futNum].join();
                             ByteBuffer keyToCheck = keyTuple.byteBuffer();
                             if (uniqueKeys.add(keyToCheck)) {
-                                rowsToInsert.put(new RowId(partId), row);
+                                rowsToInsert.put(new RowId(partId(), 
UUID.randomUUID()), row);
                             } else {
                                 result.add(row);
                             }
@@ -1620,7 +1642,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     rowIdFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, 
row) -> {
                         boolean insert = rowId == null;
 
-                        RowId rowId0 = insert ? new RowId(partId) : rowId;
+                        RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
 
                         return insert
                                 ? takeLocksForInsert(searchRow, rowId0, txId)
@@ -1789,7 +1811,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(false);
                     }
 
-                    RowId rowId0 = new RowId(partId);
+                    RowId rowId0 = new RowId(partId(), UUID.randomUUID());
 
                     return takeLocksForInsert(searchRow, rowId0, txId)
                             .thenCompose(rowIdLock -> applyUpdateCommand(
@@ -1807,7 +1829,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                     boolean insert = rowId == null;
 
-                    RowId rowId0 = insert ? new RowId(partId) : rowId;
+                    RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
 
                     CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
lockFut = insert
                             ? takeLocksForInsert(searchRow, rowId0, txId)
@@ -1829,7 +1851,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                     boolean insert = rowId == null;
 
-                    RowId rowId0 = insert ? new RowId(partId) : rowId;
+                    RowId rowId0 = insert ? new RowId(partId(), 
UUID.randomUUID()) : rowId;
 
                     CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
lockFut = insert
                             ? takeLocksForInsert(searchRow, rowId0, txId)
@@ -1897,8 +1919,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with tuple {@link RowId} and collection of 
{@link Lock}.
      */
     private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
takeLocksForUpdate(BinaryRow binaryRow, RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
-                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId, rowId), LockMode.X))
+        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.X))
                 .thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId, 
txId))
                 .thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId, 
shortTermLocks));
     }
@@ -1911,7 +1933,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with tuple {@link RowId} and collection of 
{@link Lock}.
      */
     private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
takeLocksForInsert(BinaryRow binaryRow, RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) // 
IX lock on table
+        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX) 
// IX lock on table
                 .thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId, 
txId))
                 .thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId, 
shortTermLocks));
     }
@@ -1969,11 +1991,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with {@link RowId} or {@code null} if there is 
no value for remove.
      */
     private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow 
expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) // 
IX lock on table
-                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId, rowId), LockMode.S)) // S lock on RowId
+        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX) 
// IX lock on table
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.S)) // S lock on RowId
                 .thenCompose(ignored -> {
                     if (equalValues(actualRow, expectedRow)) {
-                        return lockManager.acquire(txId, new LockKey(tableId, 
rowId), LockMode.X) // X lock on RowId
+                        return lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.X) // X lock on RowId
                                 .thenCompose(ignored0 -> 
takeRemoveLockOnIndexes(actualRow, rowId, txId))
                                 .thenApply(exclusiveRowLock -> rowId);
                     }
@@ -1989,8 +2011,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with {@link RowId} or {@code null} if there is 
no value for the key.
      */
     private CompletableFuture<RowId> takeLocksForDelete(BinaryRow binaryRow, 
RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) // 
IX lock on table
-                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId, rowId), LockMode.X)) // X lock on RowId
+        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX) 
// IX lock on table
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.X)) // X lock on RowId
                 .thenCompose(ignored -> takeRemoveLockOnIndexes(binaryRow, 
rowId, txId))
                 .thenApply(ignored -> rowId);
     }
@@ -2002,8 +2024,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with {@link RowId} or {@code null} if there is 
no value for the key.
      */
     private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId), LockMode.IS) // 
IS lock on table
-                .thenCompose(tblLock -> lockManager.acquire(txId, new 
LockKey(tableId, rowId), LockMode.S)) // S lock on RowId
+        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IS) 
// IS lock on table
+                .thenCompose(tblLock -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.S)) // S lock on RowId
                 .thenApply(ignored -> rowId);
     }
 
@@ -2059,11 +2081,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
takeLocksForReplace(BinaryRow expectedRow, BinaryRow oldRow,
             BinaryRow newRow, RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
-                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId, rowId), LockMode.S))
+        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.S))
                 .thenCompose(ignored -> {
                     if (oldRow != null && equalValues(oldRow, expectedRow)) {
-                        return lockManager.acquire(txId, new LockKey(tableId, 
rowId), LockMode.X) // X lock on RowId
+                        return lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.X) // X lock on RowId
                                 .thenCompose(ignored1 -> 
takePutLockOnIndexes(newRow, rowId, txId))
                                 .thenApply(shortTermLocks -> new 
IgniteBiTuple<>(rowId, shortTermLocks));
                     }
@@ -2110,7 +2132,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                             }
                     );
         } else if (request instanceof ReadOnlyReplicaRequest || request 
instanceof ReplicaSafeTimeSyncRequest) {
-            return 
raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm -> 
isLocalPeerChecker.apply(replicaAndTerm.leader()));
+            return 
raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm -> 
isLocalPeer(replicaAndTerm.leader()));
         } else {
             return completedFuture(null);
         }
@@ -2123,7 +2145,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return completedFuture(row);
         }
 
-        return schemaCompatValidator.validateBackwards(row.schemaVersion(), 
tableId, txId)
+        return schemaCompatValidator.validateBackwards(row.schemaVersion(), 
tableId(), txId)
                 .thenCompose(validationResult -> {
                     if (validationResult.isSuccessful()) {
                         return completedFuture(row);
@@ -2363,4 +2385,127 @@ public class PartitionReplicaListener implements 
ReplicaListener {
          */
         TxState state;
     }
+
+    @Override
+    // TODO: IGNITE-19053 Must handle the change of leader
+    // TODO: IGNITE-19053 Add a test to change the leader even at the start of 
the task
+    public void onBecomePrimary(ClusterNode clusterNode) {
+        inBusyLock(() -> {
+            if (!clusterNode.equals(localNode)) {
+                // We are not the primary replica.
+                return;
+            }
+
+            registerIndexesListener();
+
+            // Let's try to build an index for the previously created indexes 
for the table.
+            TablesView tablesView = 
mvTableStorage.tablesConfiguration().value();
+
+            for (UUID indexId : collectIndexIds(tablesView)) {
+                startBuildIndex(createIndexDescriptor(tablesView, indexId));
+            }
+        });
+    }
+
+    @Override
+    public void onShutdown() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        ConfigurationNamedListListener<TableIndexView> listener = 
indexesConfigurationListener.getAndSet(null);
+
+        if (listener != null) {
+            
mvTableStorage.tablesConfiguration().indexes().stopListenElements(listener);
+        }
+
+        indexBuilder.stopBuildIndexes(tableId(), partId());
+    }
+
+    private void registerIndexesListener() {
+        // TODO: IGNITE-19498 Might need to listen to something else
+        ConfigurationNamedListListener<TableIndexView> listener = new 
ConfigurationNamedListListener<>() {
+            @Override
+            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<TableIndexView> ctx) {
+                inBusyLock(() -> {
+                    TableIndexView tableIndexView = ctx.newValue();
+
+                    if (tableId().equals(tableIndexView.tableId())) {
+                        
startBuildIndex(createIndexDescriptor(ctx.newValue(TablesView.class), 
tableIndexView.id()));
+                    }
+                });
+
+                return completedFuture(null);
+            }
+
+            @Override
+            public CompletableFuture<?> 
onRename(ConfigurationNotificationEvent<TableIndexView> ctx) {
+                return failedFuture(new UnsupportedOperationException());
+            }
+
+            @Override
+            public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<TableIndexView> ctx) {
+                inBusyLock(() -> {
+                    TableIndexView tableIndexView = ctx.oldValue();
+
+                    if (tableId().equals(tableIndexView.tableId())) {
+                        indexBuilder.stopBuildIndex(tableId(), partId(), 
tableIndexView.id());
+                    }
+                });
+
+                return completedFuture(null);
+            }
+
+            @Override
+            public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<TableIndexView> ctx) {
+                return failedFuture(new UnsupportedOperationException());
+            }
+        };
+
+        boolean casResult = indexesConfigurationListener.compareAndSet(null, 
listener);
+
+        assert casResult : replicationGroupId;
+
+        
mvTableStorage.tablesConfiguration().indexes().listenElements(listener);
+    }
+
+    private void startBuildIndex(IndexDescriptor indexDescriptor) {
+        // TODO: IGNITE-19112 We only need to create the index storage once
+        IndexStorage indexStorage = mvTableStorage.getOrCreateIndex(partId(), 
indexDescriptor);
+
+        indexBuilder.startBuildIndex(tableId(), partId(), 
indexDescriptor.id(), indexStorage, mvDataStorage, raftClient);
+    }
+
+    private List<UUID> collectIndexIds(TablesView tablesView) {
+        return tablesView.indexes().stream()
+                .filter(tableIndexView -> 
replicationGroupId.tableId().equals(tableIndexView.tableId()))
+                .map(TableIndexView::id)
+                .collect(toList());
+    }
+
+    private int partId() {
+        return replicationGroupId.partitionId();
+    }
+
+    private UUID tableId() {
+        return replicationGroupId.tableId();
+    }
+
+    private boolean isLocalPeer(Peer peer) {
+        return peer.consistentId().equals(localNode.name());
+    }
+
+    private void inBusyLock(Runnable runnable) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            runnable.run();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index fd9c72973c..0164926784 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -36,6 +36,7 @@ import java.util.function.Function;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -49,12 +50,14 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
 import org.apache.ignite.internal.schema.marshaller.MarshallerException;
 import 
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
@@ -67,6 +70,7 @@ import 
org.apache.ignite.internal.table.distributed.SortedIndexLocker;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
@@ -85,6 +89,7 @@ import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
 import org.hamcrest.CustomMatcher;
 import org.hamcrest.Matcher;
 import org.junit.jupiter.api.BeforeAll;
@@ -116,7 +121,11 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
     private static Function<BinaryRow, BinaryTuple> row2SortKeyConverter;
 
     @BeforeAll
-    public static void beforeAll(@InjectConfiguration DataStorageConfiguration 
dsCfg) {
+    public static void beforeAll(
+            @InjectConfiguration DataStorageConfiguration dsCfg,
+            @InjectConfiguration("mock.tables.foo {}") TablesConfiguration 
tablesConfig,
+            @InjectConfiguration DistributionZoneConfiguration 
distributionZoneConfig
+    ) {
         RaftGroupService mockRaftClient = mock(RaftGroupService.class);
 
         when(mockRaftClient.refreshAndGetLeaderWithTerm())
@@ -201,8 +210,10 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                         mock(LowWatermark.class)
                 ),
                 new DummySchemas(schemaManager),
-                peer -> true,
-                CompletableFuture.completedFuture(schemaManager)
+                CompletableFuture.completedFuture(schemaManager),
+                mock(ClusterNode.class),
+                new TestMvTableStorage(tablesConfig.tables().get("foo"), 
tablesConfig, distributionZoneConfig),
+                mock(IndexBuilder.class)
         );
 
         kvMarshaller = new 
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, 
Integer.class);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index d89cefd370..aa527094d5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -68,6 +68,7 @@ import 
org.apache.ignite.internal.catalog.commands.DefaultValue;
 import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -83,6 +84,7 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
 import org.apache.ignite.internal.schema.marshaller.MarshallerException;
@@ -91,6 +93,7 @@ import 
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshal
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -110,6 +113,7 @@ import 
org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import org.apache.ignite.internal.table.distributed.command.PartitionCommand;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
@@ -289,7 +293,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private static final AtomicInteger nextMonotonicInt = new AtomicInteger(1);
 
     @BeforeEach
-    public void beforeTest(@InjectConfiguration DataStorageConfiguration 
dsCfg) {
+    public void beforeTest(
+            @InjectConfiguration DataStorageConfiguration dsCfg,
+            @InjectConfiguration("mock.tables.foo {}") TablesConfiguration 
tablesConfig,
+            @InjectConfiguration DistributionZoneConfiguration 
distributionZoneConfig
+    ) {
         
lenient().when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock
 -> {
             if (!localLeader) {
                 return completedFuture(new LeaderWithTerm(new 
Peer(anotherNode.name()), 1L));
@@ -397,8 +405,10 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         mock(LowWatermark.class)
                 ),
                 schemas,
-                peer -> localNode.name().equals(peer.consistentId()),
-                completedFuture(schemaManager)
+                completedFuture(schemaManager),
+                localNode,
+                new TestMvTableStorage(tablesConfig.tables().get("foo"), 
tablesConfig, distributionZoneConfig),
+                mock(IndexBuilder.class)
         );
 
         sortedIndexBinarySchema = 
BinaryTupleSchema.createSchema(schemaDescriptor, new int[]{2 /* intVal column 
*/});
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index a147752931..a7f1cc8a82 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -67,6 +67,7 @@ import 
org.apache.ignite.internal.table.distributed.LowWatermark;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
@@ -305,8 +306,10 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 placementDriver,
                 storageUpdateHandler,
                 new DummySchemas(schemaManager),
-                peer -> true,
-                completedFuture(schemaManager)
+                completedFuture(schemaManager),
+                mock(ClusterNode.class),
+                mock(MvTableStorage.class),
+                mock(IndexBuilder.class)
         );
 
         partitionListener = new PartitionListener(

Reply via email to