This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d7970190a4 IGNITE-20467 Use replica message instead of directly using 
raft client when building indexes (#2630)
d7970190a4 is described below

commit d7970190a48b999d0adddfd5906d691b050576d2
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Tue Sep 26 15:22:28 2023 +0300

    IGNITE-20467 Use replica message instead of directly using raft client when 
building indexes (#2630)
---
 .../internal/lang/IgniteInternalException.java     |  4 +-
 .../apache/ignite/internal/util/IgniteUtils.java   | 20 +++++++++
 .../exception/PrimaryReplicaMissException.java     | 42 ++++++++++---------
 .../internal/sql/engine/ItBuildIndexTest.java      | 16 ++++---
 .../ignite/distributed/ItTablePersistenceTest.java | 12 ++++--
 .../internal/table/distributed/TableManager.java   |  2 +-
 .../table/distributed/TableMessageGroup.java       |  6 +++
 .../distributed/command/BuildIndexCommand.java     | 21 ++--------
 .../table/distributed/index/IndexBuildTask.java    | 46 +++++++++-----------
 .../table/distributed/index/IndexBuilder.java      | 49 ++++++++++++----------
 .../distributed/raft/PartitionDataStorage.java     |  8 +++-
 .../table/distributed/raft/PartitionListener.java  |  2 +-
 .../SnapshotAwarePartitionDataStorage.java         | 10 +++++
 .../request/BuildIndexReplicaRequest.java}         | 29 ++++---------
 .../replicator/PartitionReplicaListener.java       | 27 +++++++++++-
 .../internal/table/distributed/IndexBaseTest.java  |  9 ++--
 .../distributed/StorageUpdateHandlerTest.java      | 10 +++--
 .../gc/AbstractGcUpdateHandlerTest.java            |  4 +-
 .../PersistentPageMemoryGcUpdateHandlerTest.java   |  2 +-
 .../distributed/gc/RocksDbGcUpdateHandlerTest.java |  2 +-
 .../distributed/gc/TestGcUpdateHandlerTest.java    |  2 +-
 .../gc/VolatilePageMemoryGcUpdateHandlerTest.java  |  2 +-
 .../raft/PartitionCommandListenerTest.java         | 33 ++++-----------
 .../PartitionReplicaListenerIndexLockingTest.java  |  2 +-
 .../replication/PartitionReplicaListenerTest.java  |  2 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |  2 +-
 .../distributed/TestPartitionDataStorage.java      | 23 +++++++++-
 .../table/impl/DummyInternalTableImpl.java         |  4 +-
 28 files changed, 223 insertions(+), 168 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteInternalException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteInternalException.java
index e7a0c18089..816edd19a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteInternalException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteInternalException.java
@@ -207,11 +207,11 @@ public class IgniteInternalException extends 
RuntimeException implements Traceab
      *
      * @param code Full error code.
      * @param messagePattern Error message pattern.
-     * @param cause Non-null throwable cause.
+     * @param cause Throwable cause.
      * @param params Error message params.
      * @see IgniteStringFormatter#format(String, Object...)
      */
-    public IgniteInternalException(int code, String messagePattern, Throwable 
cause, Object... params) {
+    public IgniteInternalException(int code, String messagePattern, @Nullable 
Throwable cause, Object... params) {
         this(code, IgniteStringFormatter.format(messagePattern, params), 
cause);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 321428356f..cf83a563b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -777,6 +777,7 @@ public class IgniteUtils {
      * @param fn Function to run.
      * @param <T> Type of returned value from {@code fn}.
      * @return Result of the provided function.
+     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
      */
     public static <T> T inBusyLock(IgniteSpinBusyLock busyLock, Supplier<T> 
fn) {
         if (!busyLock.enterBusy()) {
@@ -794,6 +795,7 @@ public class IgniteUtils {
      *
      * @param busyLock Component's busy lock.
      * @param fn Runnable to run.
+     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
      */
     public static void inBusyLock(IgniteSpinBusyLock busyLock, Runnable fn) {
         if (!busyLock.enterBusy()) {
@@ -829,6 +831,24 @@ public class IgniteUtils {
         }
     }
 
+    /**
+     * Method that runs the provided {@code fn} in {@code busyLock} if {@link 
IgniteSpinBusyLock#enterBusy()} succeed. Otherwise it just
+     * silently returns.
+     *
+     * @param busyLock Component's busy lock.
+     * @param fn Runnable to run.
+     */
+    public static void inBusyLockSafe(IgniteSpinBusyLock busyLock, Runnable 
fn) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+        try {
+            fn.run();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     /**
      * Collects all the fields of given class which are defined as a public 
static within the specified class.
      *
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
index b63eac2425..25cf994173 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
@@ -17,22 +17,24 @@
 
 package org.apache.ignite.internal.replicator.exception;
 
-import java.util.UUID;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
+
 import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.ErrorGroups.Replicator;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Unchecked exception that is thrown when a replica is not the current 
primary replica.
  */
 public class PrimaryReplicaMissException extends IgniteInternalException {
+    private static final long serialVersionUID = 8755220779942651494L;
+
     /**
      * The constructor.
      *
      * @param expectedPrimaryReplicaTerm Expected term from.
      * @param currentPrimaryReplicaTerm Current raft term.
      */
-    public PrimaryReplicaMissException(Long expectedPrimaryReplicaTerm, long 
currentPrimaryReplicaTerm) {
+    public PrimaryReplicaMissException(long expectedPrimaryReplicaTerm, long 
currentPrimaryReplicaTerm) {
         this(expectedPrimaryReplicaTerm, currentPrimaryReplicaTerm, null);
     }
 
@@ -43,25 +45,27 @@ public class PrimaryReplicaMissException extends 
IgniteInternalException {
      * @param currentPrimaryReplicaTerm Current raft term.
      * @param cause Cause exception.
      */
-    public PrimaryReplicaMissException(Long expectedPrimaryReplicaTerm, long 
currentPrimaryReplicaTerm, Throwable cause) {
-        super(Replicator.REPLICA_MISS_ERR,
-                IgniteStringFormatter.format(
-                        "The primary replica has changed because the term has 
been changed "
-                                + "[expectedPrimaryReplicaTerm={}, 
currentPrimaryReplicaTerm={}]",
-                        expectedPrimaryReplicaTerm, currentPrimaryReplicaTerm
-                ),
-                cause);
+    public PrimaryReplicaMissException(long expectedPrimaryReplicaTerm, long 
currentPrimaryReplicaTerm, @Nullable Throwable cause) {
+        super(
+                REPLICA_MISS_ERR,
+                "The primary replica has changed because the term has been 
changed "
+                        + "[expectedPrimaryReplicaTerm={}, 
currentPrimaryReplicaTerm={}]",
+                cause,
+                expectedPrimaryReplicaTerm, currentPrimaryReplicaTerm
+        );
     }
 
     /**
-     * The constructor is used for creating an exception instance that is 
thrown from a remote server.
+     * The constructor.
      *
-     * @param traceId Trace id.
-     * @param code Error code.
-     * @param message Error message.
-     * @param cause Cause exception.
+     * @param expectedLeaseholder Expected leaseholder.
+     * @param currentLeaseholder Current leaseholder.
      */
-    public PrimaryReplicaMissException(UUID traceId, int code, String message, 
Throwable cause) {
-        super(traceId, code, message, cause);
+    public PrimaryReplicaMissException(String expectedLeaseholder, String 
currentLeaseholder) {
+        super(
+                REPLICA_MISS_ERR,
+                "The primary replica has changed [expectedLeaseholder={}, 
currentLeaseholder={}]",
+                expectedLeaseholder, currentLeaseholder
+        );
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
index 4dfb279310..60aa23c679 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
@@ -56,16 +56,13 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
-/**
- * Integration test of index building.
- */
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-20096";)
+/** Integration test of index building. */
 public class ItBuildIndexTest extends ClusterPerClassIntegrationTest {
-    private static final String ZONE_NAME = "zone_table";
+    private static final String ZONE_NAME = "ZONE_TABLE";
 
-    private static final String TABLE_NAME = "test_table";
+    private static final String TABLE_NAME = "TEST_TABLE";
 
-    private static final String INDEX_NAME = "test_index";
+    private static final String INDEX_NAME = "TEST_INDEX";
 
     @AfterEach
     void tearDown() {
@@ -89,7 +86,7 @@ public class ItBuildIndexTest extends 
ClusterPerClassIntegrationTest {
         checkIndexBuild(partitions, replicas, INDEX_NAME);
 
         assertQuery(IgniteStringFormatter.format("SELECT * FROM {} WHERE i1 > 
0", TABLE_NAME))
-                .matches(containsIndexScan("PUBLIC", TABLE_NAME.toUpperCase(), 
INDEX_NAME.toUpperCase()))
+                .matches(containsIndexScan("PUBLIC", TABLE_NAME, INDEX_NAME))
                 .returns(1, 1)
                 .returns(2, 2)
                 .returns(3, 3)
@@ -99,6 +96,7 @@ public class ItBuildIndexTest extends 
ClusterPerClassIntegrationTest {
     }
 
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20330";)
     void testChangePrimaryReplicaOnMiddleBuildIndex() throws Exception {
         prepareBuildIndexToChangePrimaryReplica();
 
@@ -174,7 +172,7 @@ public class ItBuildIndexTest extends 
ClusterPerClassIntegrationTest {
 
         sql(IgniteStringFormatter.format(
                 "CREATE TABLE {} (i0 INTEGER PRIMARY KEY, i1 INTEGER) WITH 
PRIMARY_ZONE='{}'",
-                TABLE_NAME, ZONE_NAME.toUpperCase()
+                TABLE_NAME, ZONE_NAME
         ));
 
         sql(IgniteStringFormatter.format(
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index d7752fd878..9c25269b54 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -410,18 +410,22 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                     RocksDbStorageEngine storageEngine = new 
RocksDbStorageEngine("test", engineConfig, path);
                     storageEngine.start();
 
+                    int tableId = 1;
+
                     MvTableStorage mvTableStorage = 
storageEngine.createMvTable(
-                            new StorageTableDescriptor(1, 1, 
DEFAULT_DATA_REGION_NAME),
+                            new StorageTableDescriptor(tableId, 1, 
DEFAULT_DATA_REGION_NAME),
                             new 
StorageIndexDescriptorSupplier(mock(CatalogService.class))
                     );
                     mvTableStorage.start();
 
                     mvTableStorages.put(index, mvTableStorage);
 
-                    MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(mvTableStorage, 0);
+                    int partitionId = 0;
+
+                    MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(mvTableStorage, partitionId);
                     mvPartitionStorages.put(index, mvPartitionStorage);
 
-                    PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartitionStorage);
+                    PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(tableId, partitionId, mvPartitionStorage);
 
                     PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime = new PendingComparableValuesTracker<>(
                             new HybridTimestamp(1, 0)
@@ -432,7 +436,7 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                     );
 
                     StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(
-                            0,
+                            partitionId,
                             partitionDataStorage,
                             gcConfig,
                             mock(LowWatermark.class),
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4d41718012..6830a03fab 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -488,7 +488,7 @@ public class TableManager extends 
AbstractEventProducer<TableEvent, TableEventPa
 
         lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), 
clock, txManager, vaultManager, mvGc);
 
-        indexBuilder = new IndexBuilder(nodeName, cpus);
+        indexBuilder = new IndexBuilder(nodeName, cpus, replicaSvc);
 
         raftCommandsMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 2eff68eb00..78604d0c17 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
@@ -161,8 +162,13 @@ public interface TableMessageGroup {
      */
     short RW_MULTI_ROW_PK_REPLICA_REQUEST = 20;
 
+    /** Message type for {@link BuildIndexReplicaRequest}. */
+    short BUILD_INDEX_REPLICA_REQUEST = 21;
+
     /**
      * Message types for Table module RAFT commands.
+     *
+     * <p>NOTE: Commands must be immutable because they will be stored in the 
replication log.</p>
      */
     interface Commands {
         /** Message type for {@link FinishTxCommand}. */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
index 8ebd616916..8e17d1d827 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
@@ -23,28 +23,15 @@ import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.network.annotations.Transferable;
 
-/**
- * State machine command to build a table index.
- */
+/** State machine command to build a table index. */
 @Transferable(TableMessageGroup.Commands.BUILD_INDEX)
 public interface BuildIndexCommand extends WriteCommand {
-    /**
-     * Returns ID of table partition.
-     */
-    TablePartitionIdMessage tablePartitionId();
-
-    /**
-     * Returns index ID.
-     */
+    /** Returns index ID. */
     int indexId();
 
-    /**
-     * Returns row IDs for which to build indexes.
-     */
+    /** Returns row IDs for which to build indexes. */
     List<UUID> rowIds();
 
-    /**
-     * Returns {@code true} if this batch is the last one.
-     */
+    /** Returns {@code true} if this batch is the last one. */
     boolean finish();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
index 71098c2b31..a8a1c8fb8c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
@@ -31,17 +31,17 @@ import java.util.function.Function;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
-import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
 
-/**
- * Task of building a table index.
- */
+/** Task of building a table index. */
 class IndexBuildTask {
     private static final IgniteLogger LOG = 
Loggers.forClass(IndexBuildTask.class);
 
@@ -53,7 +53,7 @@ class IndexBuildTask {
 
     private final MvPartitionStorage partitionStorage;
 
-    private final RaftGroupService raftClient;
+    private final ReplicaService replicaService;
 
     private final ExecutorService executor;
 
@@ -61,6 +61,8 @@ class IndexBuildTask {
 
     private final int batchSize;
 
+    private final ClusterNode node;
+
     private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();
 
     private final AtomicBoolean taskStopGuard = new AtomicBoolean();
@@ -71,23 +73,23 @@ class IndexBuildTask {
             IndexBuildTaskId taskId,
             IndexStorage indexStorage,
             MvPartitionStorage partitionStorage,
-            RaftGroupService raftClient,
+            ReplicaService replicaService,
             ExecutorService executor,
             IgniteSpinBusyLock busyLock,
-            int batchSize
+            int batchSize,
+            ClusterNode node
     ) {
         this.taskId = taskId;
         this.indexStorage = indexStorage;
         this.partitionStorage = partitionStorage;
-        this.raftClient = raftClient;
+        this.replicaService = replicaService;
         this.executor = executor;
         this.busyLock = busyLock;
         this.batchSize = batchSize;
+        this.node = node;
     }
 
-    /**
-     * Starts building the index.
-     */
+    /** Starts building the index. */
     void start() {
         if (!enterBusy()) {
             taskFuture.complete(null);
@@ -120,9 +122,7 @@ class IndexBuildTask {
         }
     }
 
-    /**
-     * Stops index building.
-     */
+    /** Stops index building. */
     void stop() {
         if (!taskStopGuard.compareAndSet(false, true)) {
             return;
@@ -131,9 +131,7 @@ class IndexBuildTask {
         taskBusyLock.block();
     }
 
-    /**
-     * Returns the index build future.
-     */
+    /** Returns the index build future. */
     CompletableFuture<Void> getTaskFuture() {
         return taskFuture;
     }
@@ -146,7 +144,7 @@ class IndexBuildTask {
         try {
             List<RowId> batchRowIds = createBatchRowIds();
 
-            return raftClient.run(createBuildIndexCommand(batchRowIds))
+            return replicaService.invoke(node, 
createBuildIndexReplicaRequest(batchRowIds))
                     .thenComposeAsync(unused -> {
                         if (indexStorage.getNextRowIdToBuild() == null) {
                             // Index has been built.
@@ -182,15 +180,11 @@ class IndexBuildTask {
         return batch;
     }
 
-    private BuildIndexCommand createBuildIndexCommand(List<RowId> rowIds) {
+    private BuildIndexReplicaRequest 
createBuildIndexReplicaRequest(List<RowId> rowIds) {
         boolean finish = rowIds.size() < batchSize;
 
-        return TABLE_MESSAGES_FACTORY.buildIndexCommand()
-                
.tablePartitionId(TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
-                        .tableId(taskId.getTableId())
-                        .partitionId(taskId.getPartitionId())
-                        .build()
-                )
+        return TABLE_MESSAGES_FACTORY.buildIndexReplicaRequest()
+                .groupId(new TablePartitionId(taskId.getTableId(), 
taskId.getPartitionId()))
                 .indexId(taskId.getIndexId())
                 .rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
                 .finish(finish)
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
index 150b7edf08..8e23f1a956 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.index;
 
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -29,13 +31,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
 
 /**
  * Class for managing the building of table indexes.
@@ -47,6 +51,8 @@ public class IndexBuilder implements ManuallyCloseable {
 
     private final ExecutorService executor;
 
+    private final ReplicaService replicaService;
+
     private final Map<IndexBuildTaskId, IndexBuildTask> indexBuildTaskById = 
new ConcurrentHashMap<>();
 
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -58,8 +64,11 @@ public class IndexBuilder implements ManuallyCloseable {
      *
      * @param nodeName Node name.
      * @param threadCount Number of threads to build indexes.
+     * @param replicaService Replica service.
      */
-    public IndexBuilder(String nodeName, int threadCount) {
+    public IndexBuilder(String nodeName, int threadCount, ReplicaService 
replicaService) {
+        this.replicaService = replicaService;
+
         executor = new ThreadPoolExecutor(
                 threadCount,
                 threadCount,
@@ -73,16 +82,17 @@ public class IndexBuilder implements ManuallyCloseable {
     /**
      * Starts building the index if it is not already built or is not yet in 
progress.
      *
-     * <p>Index is built in batches using {@link BuildIndexCommand} (via 
raft), batches are sent sequentially.
+     * <p>Index is built in batches using {@link BuildIndexReplicaRequest}, 
which are then transformed into {@link BuildIndexCommand} on the
+     * replica, batches are sent sequentially.</p>
      *
-     * <p>It is expected that the index building is triggered by the leader of 
the raft group.
+     * <p>It is expected that the index building is triggered by the primary 
replica.</p>
      *
      * @param tableId Table ID.
      * @param partitionId Partition ID.
      * @param indexId Index ID.
      * @param indexStorage Index storage to build.
      * @param partitionStorage Multi-versioned partition storage.
-     * @param raftClient Raft client.
+     * @param node Node to which requests to build the index will be sent.
      */
     // TODO: IGNITE-19498 Perhaps we need to start building the index only once
     public void startBuildIndex(
@@ -91,16 +101,25 @@ public class IndexBuilder implements ManuallyCloseable {
             int indexId,
             IndexStorage indexStorage,
             MvPartitionStorage partitionStorage,
-            RaftGroupService raftClient
+            ClusterNode node
     ) {
-        inBusyLock(() -> {
+        inBusyLockSafe(busyLock, () -> {
             if (indexStorage.getNextRowIdToBuild() == null) {
                 return;
             }
 
             IndexBuildTaskId taskId = new IndexBuildTaskId(tableId, 
partitionId, indexId);
 
-            IndexBuildTask newTask = new IndexBuildTask(taskId, indexStorage, 
partitionStorage, raftClient, executor, busyLock, BATCH_SIZE);
+            IndexBuildTask newTask = new IndexBuildTask(
+                    taskId,
+                    indexStorage,
+                    partitionStorage,
+                    replicaService,
+                    executor,
+                    busyLock,
+                    BATCH_SIZE,
+                    node
+            );
 
             IndexBuildTask previousTask = 
indexBuildTaskById.putIfAbsent(taskId, newTask);
 
@@ -123,7 +142,7 @@ public class IndexBuilder implements ManuallyCloseable {
      * @param indexId Index ID.
      */
     public void stopBuildIndex(int tableId, int partitionId, int indexId) {
-        inBusyLock(() -> {
+        inBusyLockSafe(busyLock, () -> {
             IndexBuildTask removed = indexBuildTaskById.remove(new 
IndexBuildTaskId(tableId, partitionId, indexId));
 
             if (removed != null) {
@@ -170,16 +189,4 @@ public class IndexBuilder implements ManuallyCloseable {
 
         IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
     }
-
-    private void inBusyLock(Runnable runnable) {
-        if (!busyLock.enterBusy()) {
-            return;
-        }
-
-        try {
-            runnable.run();
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index 945e414282..9b8bdfc4cb 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
- * Provides access to MV (multi-version) data of a partition.
+ * Provides access to MV (multi-version) data of a table partition.
  *
  * <p>Methods writing to MV storage ({@link #addWrite(RowId, BinaryRow, UUID, 
int, int)}, {@link #abortWrite(RowId)}
  * and {@link #commitWrite(RowId, HybridTimestamp)}) and TX data storage MUST 
be invoked under a lock acquired using
@@ -47,6 +47,12 @@ import org.jetbrains.annotations.TestOnly;
  * @see MvPartitionStorage
  */
 public interface PartitionDataStorage extends ManuallyCloseable {
+    /** Returns table ID. */
+    int tableId();
+
+    /** Returns partition ID. */
+    int partitionId();
+
     /**
      * Executes {@link WriteClosure} atomically, meaning that partial result 
of an incomplete closure will never be written to the
      * physical device, thus guaranteeing data consistency after restart. 
Simply runs the closure in case of a volatile storage.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 78ca782a92..13eba978e8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -489,7 +489,7 @@ public class PartitionListener implements RaftGroupListener 
{
         if (cmd.finish()) {
             LOG.info(
                     "Finish building the index: [tableId={}, partitionId={}, 
indexId={}]",
-                    cmd.tablePartitionId().tableId(), 
cmd.tablePartitionId().partitionId(), cmd.indexId()
+                    storage.tableId(), storage.partitionId(), cmd.indexId()
             );
         }
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index f1dbcc4404..f46d0a01c0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -61,6 +61,16 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
         this.partitionKey = partitionKey;
     }
 
+    @Override
+    public int tableId() {
+        return partitionKey.tableId();
+    }
+
+    @Override
+    public int partitionId() {
+        return partitionKey.partitionId();
+    }
+
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws 
StorageException {
         return partitionStorage.runConsistently(closure);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java
similarity index 64%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java
index 8ebd616916..83e895f592 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java
@@ -15,36 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.command;
+package org.apache.ignite.internal.table.distributed.replication.request;
 
 import java.util.List;
 import java.util.UUID;
-import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.network.annotations.Transferable;
 
-/**
- * State machine command to build a table index.
- */
-@Transferable(TableMessageGroup.Commands.BUILD_INDEX)
-public interface BuildIndexCommand extends WriteCommand {
-    /**
-     * Returns ID of table partition.
-     */
-    TablePartitionIdMessage tablePartitionId();
-
-    /**
-     * Returns index ID.
-     */
+/** Replica request to build a table index. */
+@Transferable(TableMessageGroup.BUILD_INDEX_REPLICA_REQUEST)
+public interface BuildIndexReplicaRequest extends ReplicaRequest {
+    /** Returns index ID. */
     int indexId();
 
-    /**
-     * Returns row IDs for which to build indexes.
-     */
+    /** Returns row IDs for which to build indexes. */
     List<UUID> rowIds();
 
-    /**
-     * Returns {@code true} if this batch is the last one.
-     */
+    /** Returns {@code true} if this batch is the last one. */
     boolean finish();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index daf7223079..744518e1d1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -108,6 +108,7 @@ import 
org.apache.ignite.internal.table.distributed.SortedIndexLocker;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import 
org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
@@ -117,6 +118,7 @@ import 
org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder
 import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
+import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.CommittableTxRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
@@ -406,6 +408,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request, isPrimary);
         } else if (request instanceof ReplicaSafeTimeSyncRequest) {
             return 
processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request, 
isPrimary);
+        } else if (request instanceof BuildIndexReplicaRequest) {
+            return 
raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request));
         } else {
             throw new UnsupportedReplicaRequestException(request.getClass());
         }
@@ -2366,7 +2370,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * Ensure that the primary replica was not changed.
      *
      * @param request Replica request.
-     * @return Future. The result is not null only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary.
+     * @return Future. The result is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary.
      */
     private CompletableFuture<Boolean> ensureReplicaIsPrimary(ReplicaRequest 
request) {
         Long expectedTerm;
@@ -2410,6 +2414,17 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         } else if (request instanceof ReadOnlyReplicaRequest || request 
instanceof ReplicaSafeTimeSyncRequest) {
             return placementDriver.getPrimaryReplica(replicationGroupId, now)
                     .thenApply(primaryReplica -> (primaryReplica != null && 
isLocalPeer(primaryReplica.getLeaseholder())));
+        } else if (request instanceof BuildIndexReplicaRequest) {
+            // TODO: IGNITE-20330 Possibly replaced by 
placementDriver#getPrimaryReplica and should also be added to the documentation
+            //  about PrimaryReplicaMissException
+            return placementDriver.awaitPrimaryReplica(replicationGroupId, now)
+                    .thenCompose(replicaMeta -> {
+                        if (isLocalPeer(replicaMeta.getLeaseholder())) {
+                            return completedFuture(null);
+                        } else {
+                            return failedFuture(new 
PrimaryReplicaMissException(localNode.name(), replicaMeta.getLeaseholder()));
+                        }
+                    });
         } else {
             return completedFuture(null);
         }
@@ -2827,7 +2842,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         // TODO: IGNITE-19112 We only need to create the index storage once
         IndexStorage indexStorage = mvTableStorage.getOrCreateIndex(partId(), 
indexDescriptor);
 
-        indexBuilder.startBuildIndex(tableId(), partId(), 
indexDescriptor.id(), indexStorage, mvDataStorage, raftClient);
+        indexBuilder.startBuildIndex(tableId(), partId(), 
indexDescriptor.id(), indexStorage, mvDataStorage, localNode);
     }
 
     private int partId() {
@@ -2920,4 +2935,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         return tableDescriptor;
     }
+
+    private static BuildIndexCommand 
toBuildIndexCommand(BuildIndexReplicaRequest request) {
+        return MSG_FACTORY.buildIndexCommand()
+                .indexId(request.indexId())
+                .rowIds(request.rowIds())
+                .finish(request.finish())
+                .build();
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index ad2d717695..b604afee6b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -97,9 +97,10 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
 
     @BeforeEach
     void setUp(@InjectConfiguration GcConfiguration gcConfig) {
-        int pkIndexId = 1;
-        int sortedIndexId = 2;
-        int hashIndexId = 3;
+        int tableId = 1;
+        int pkIndexId = 2;
+        int sortedIndexId = 3;
+        int hashIndexId = 4;
 
         pkInnerStorage = new TestHashIndexStorage(PARTITION_ID, new 
StorageHashIndexDescriptor(pkIndexId, List.of(
                 new StorageHashIndexColumnDescriptor("INTKEY", 
NativeTypes.INT32, false),
@@ -142,7 +143,7 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
                 hashIndexId, hashIndexStorage
         );
 
-        TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(storage);
+        TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(tableId, PARTITION_ID, storage);
 
         IndexUpdateHandler indexUpdateHandler = new 
IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes));
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
index fa845fc6d1..22d981e0f4 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -48,6 +48,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
  */
 @ExtendWith(ConfigurationExtension.class)
 public class StorageUpdateHandlerTest extends BaseIgniteAbstractTest {
+    private static final int TABLE_ID = 1;
+
     private static final int PARTITION_ID = 0;
 
     @InjectConfiguration
@@ -110,7 +112,7 @@ public class StorageUpdateHandlerTest extends 
BaseIgniteAbstractTest {
         storageUpdateHandler.handleUpdate(
                 UUID.randomUUID(),
                 UUID.randomUUID(),
-                new TablePartitionId(1, PARTITION_ID),
+                new TablePartitionId(TABLE_ID, PARTITION_ID),
                 null,
                 false,
                 null,
@@ -133,7 +135,7 @@ public class StorageUpdateHandlerTest extends 
BaseIgniteAbstractTest {
         storageUpdateHandler.handleUpdateAll(
                 UUID.randomUUID(),
                 Map.of(),
-                new TablePartitionId(1, PARTITION_ID),
+                new TablePartitionId(TABLE_ID, PARTITION_ID),
                 false,
                 null,
                 null
@@ -156,7 +158,9 @@ public class StorageUpdateHandlerTest extends 
BaseIgniteAbstractTest {
     }
 
     private static PartitionDataStorage createPartitionDataStorage() {
-        PartitionDataStorage partitionStorage = spy(new 
TestPartitionDataStorage(new TestMvPartitionStorage(PARTITION_ID)));
+        PartitionDataStorage partitionStorage = spy(
+                new TestPartitionDataStorage(TABLE_ID, PARTITION_ID, new 
TestMvPartitionStorage(PARTITION_ID))
+        );
 
         return partitionStorage;
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
index 9fddeb950a..98ef38737e 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
@@ -54,6 +54,8 @@ abstract class AbstractGcUpdateHandlerTest extends 
BaseMvStoragesTest {
     /** To be used in a loop. {@link RepeatedTest} has a smaller failure rate 
due to recreating the storage every time. */
     private static final int REPEATS = 1000;
 
+    protected static final int TABLE_ID = 1;
+
     private static final int PARTITION_ID = 0;
 
     private MvTableStorage tableStorage;
@@ -193,7 +195,7 @@ abstract class AbstractGcUpdateHandlerTest extends 
BaseMvStoragesTest {
     }
 
     private TestPartitionDataStorage createPartitionDataStorage() {
-        return new 
TestPartitionDataStorage(getOrCreateMvPartition(tableStorage, PARTITION_ID));
+        return new TestPartitionDataStorage(TABLE_ID, PARTITION_ID, 
getOrCreateMvPartition(tableStorage, PARTITION_ID));
     }
 
     private static IndexUpdateHandler createIndexUpdateHandler() {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
index 17c9a60bfa..dc4ff2bfbf 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
@@ -70,7 +70,7 @@ class PersistentPageMemoryGcUpdateHandlerTest extends 
AbstractGcUpdateHandlerTes
         engine.start();
 
         table = engine.createMvTable(
-                new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
+                new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
                 new StorageIndexDescriptorSupplier(mock(CatalogService.class))
         );
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
index 20a3a28cbe..aa96468c1b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
@@ -56,7 +56,7 @@ class RocksDbGcUpdateHandlerTest extends 
AbstractGcUpdateHandlerTest {
         engine.start();
 
         table = engine.createMvTable(
-                new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
+                new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
                 new StorageIndexDescriptorSupplier(mock(CatalogService.class))
         );
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/TestGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/TestGcUpdateHandlerTest.java
index 3fd4e21872..57ff84b89b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/TestGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/TestGcUpdateHandlerTest.java
@@ -25,6 +25,6 @@ import org.junit.jupiter.api.BeforeEach;
 class TestGcUpdateHandlerTest extends AbstractGcUpdateHandlerTest {
     @BeforeEach
     void setUp() {
-        initialize(new TestMvTableStorage(1, DEFAULT_PARTITION_COUNT));
+        initialize(new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java
index d97b61e2b5..630e7c77a8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/VolatilePageMemoryGcUpdateHandlerTest.java
@@ -55,7 +55,7 @@ class VolatilePageMemoryGcUpdateHandlerTest extends 
AbstractGcUpdateHandlerTest
         engine.start();
 
         table = engine.createMvTable(
-                new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
+                new StorageTableDescriptor(TABLE_ID, DEFAULT_PARTITION_COUNT, 
DEFAULT_DATA_REGION_NAME),
                 new StorageIndexDescriptorSupplier(mock(CatalogService.class))
         );
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index c9aa2201c8..855808b23f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -126,26 +126,22 @@ import org.mockito.junit.jupiter.MockitoExtension;
 @ExtendWith(MockitoExtension.class)
 @ExtendWith(ConfigurationExtension.class)
 public class PartitionCommandListenerTest extends BaseIgniteAbstractTest {
-    /** Key count. */
     private static final int KEY_COUNT = 100;
 
-    /** Partition id. */
+    private static final int TABLE_ID = 1;
+
     private static final int PARTITION_ID = 0;
 
-    /** Schema. */
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT32, false)},
             new Column[]{new Column("value", NativeTypes.INT32, false)}
     );
 
-    /** Table command listener. */
     private PartitionListener commandListener;
 
-    /** RAFT index. */
     private final AtomicLong raftIndex = new AtomicLong();
 
-    /** Primary index. */
     private final TableSchemaAwareIndexStorage pkStorage = new 
TableSchemaAwareIndexStorage(
             1,
             new TestHashIndexStorage(
@@ -155,28 +151,19 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
             BinaryRowConverter.keyExtractor(SCHEMA)
     );
 
-    /** Partition storage. */
     private final MvPartitionStorage mvPartitionStorage = spy(new 
TestMvPartitionStorage(PARTITION_ID));
 
-    private final PartitionDataStorage partitionDataStorage = spy(new 
TestPartitionDataStorage(mvPartitionStorage));
+    private final PartitionDataStorage partitionDataStorage = spy(new 
TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage));
 
-    /** Transaction meta storage. */
     private final TxStateStorage txStateStorage = spy(new 
TestTxStateStorage());
 
-    /** Work directory. */
     @WorkDirectory
     private Path workDir;
 
-    /** Factory for command messages. */
     private final TableMessagesFactory msgFactory = new TableMessagesFactory();
 
-    /** Factory for replica messages. */
-    private final ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
-
-    /** Hybrid clock. */
     private final HybridClock hybridClock = new HybridClockImpl();
 
-    /** Safe time tracker. */
     private PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker;
 
     @Captor
@@ -300,7 +287,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
      */
     @Test
     public void 
testOnSnapshotSavePropagateLastAppliedIndexAndTerm(@InjectConfiguration 
GcConfiguration gcConfig) {
-        TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartitionStorage);
+        TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage);
 
         IndexUpdateHandler indexUpdateHandler1 = new IndexUpdateHandler(
                 
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.id(), 
pkStorage))
@@ -524,12 +511,6 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
     private BuildIndexCommand createBuildIndexCommand(int indexId, List<UUID> 
rowUuids, boolean finish) {
         return msgFactory.buildIndexCommand()
-                .tablePartitionId(
-                        msgFactory.tablePartitionIdMessage()
-                                .tableId(1)
-                                .partitionId(PARTITION_ID)
-                                .build()
-                )
                 .indexId(indexId)
                 .rowIds(rowUuids)
                 .finish(finish)
@@ -636,7 +617,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
     private void insertAll() {
         Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
         UUID txId = TestTransactionIds.newTransactionId();
-        var commitPartId = new TablePartitionId(1, PARTITION_ID);
+        var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID);
 
         for (int i = 0; i < KEY_COUNT; i++) {
             rows.put(TestTransactionIds.newTransactionId(), getTestRow(i, i));
@@ -672,7 +653,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
      */
     private void updateAll(Function<Integer, Integer> keyValueMapper) {
         UUID txId = TestTransactionIds.newTransactionId();
-        var commitPartId = new TablePartitionId(1, PARTITION_ID);
+        var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID);
         Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
 
         for (int i = 0; i < KEY_COUNT; i++) {
@@ -709,7 +690,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
      */
     private void deleteAll() {
         UUID txId = TestTransactionIds.newTransactionId();
-        var commitPartId = new TablePartitionId(1, PARTITION_ID);
+        var commitPartId = new TablePartitionId(TABLE_ID, PARTITION_ID);
         Map<UUID, BinaryRowMessage> keyRows = new HashMap<>(KEY_COUNT);
 
         for (int i = 0; i < KEY_COUNT; i++) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 7d7b4215bf..6c2432cb90 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -201,7 +201,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(),
 pkStorage.get()))
         );
 
-        TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE);
+        TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(TABLE_ID, PART_ID, TEST_MV_PARTITION_STORAGE);
 
         CatalogTables catalogTables = mock(CatalogTables.class);
         when(catalogTables.table(anyInt(), 
anyLong())).thenReturn(mock(CatalogTableDescriptor.class));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 2d4b1f4fe5..f5dda8b3a4 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -275,7 +275,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     private TransactionStateResolver transactionStateResolver;
 
-    private final PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(testMvPartitionStorage);
+    private final PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(TABLE_ID, PART_ID, testMvPartitionStorage);
 
     @Mock
     private RaftGroupService mockRaftClient;
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index ece7c2926a..615dd012f0 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -448,7 +448,7 @@ public class ItTxTestCluster {
                         new 
PendingComparableValuesTracker<>(clocks.get(assignment).now());
                 PendingComparableValuesTracker<Long, Void> storageIndexTracker 
= new PendingComparableValuesTracker<>(0L);
 
-                PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartStorage);
+                PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(tableId, partId, mvPartStorage);
 
                 IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(
                         
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(),
 pkStorage.get()))
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 6a18c727ec..2b8a78aa63 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -41,16 +41,37 @@ import org.jetbrains.annotations.Nullable;
  * Test implementation of {@link PartitionDataStorage}.
  */
 public class TestPartitionDataStorage implements PartitionDataStorage {
+    private final int tableId;
+
+    private final int partitionId;
+
     private final MvPartitionStorage partitionStorage;
 
     private final Lock partitionSnapshotsLock = new ReentrantLock();
 
     private final RaftGroupConfigurationConverter configurationConverter = new 
RaftGroupConfigurationConverter();
 
-    public TestPartitionDataStorage(MvPartitionStorage partitionStorage) {
+    /** Constructor. */
+    public TestPartitionDataStorage(
+            int tableId,
+            int partitionId,
+            MvPartitionStorage partitionStorage
+    ) {
+        this.tableId = tableId;
+        this.partitionId = partitionId;
         this.partitionStorage = partitionStorage;
     }
 
+    @Override
+    public int tableId() {
+        return tableId;
+    }
+
+    @Override
+    public int partitionId() {
+        return partitionId;
+    }
+
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws 
StorageException {
         return partitionStorage.runConsistently(closure);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 349b66dc50..726610e2f2 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -323,7 +323,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         safeTime = new 
PendingIndependentComparableValuesTracker<>(HybridTimestamp.MIN_VALUE);
 
-        PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartStorage);
+        PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(tableId, PART_ID, mvPartStorage);
         TableIndexStoragesSupplier indexes = 
createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()));
 
         GcConfiguration gcConfig = mock(GcConfiguration.class);
@@ -378,7 +378,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         partitionListener = new PartitionListener(
                 this.txManager,
-                new TestPartitionDataStorage(mvPartStorage),
+                new TestPartitionDataStorage(tableId, PART_ID, mvPartStorage),
                 storageUpdateHandler,
                 txStateStorage().getOrCreateTxStateStorage(PART_ID),
                 safeTime,

Reply via email to