This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b8c9fd07a86 IGNITE-27349 Execute necessary waits before index build 
starts (#7273)
b8c9fd07a86 is described below

commit b8c9fd07a86a1ff7d184b4162b5a190018dc30ab
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Dec 19 20:12:34 2025 +0400

    IGNITE-27349 Execute necessary waits before index build starts (#7273)
---
 .../index/ItIndexBuildCompletenessTest.java        |  6 +-
 .../internal/index/IndexBuildController.java       | 64 +++++++++++++++++
 .../ignite/internal/index/IndexBuildTask.java      | 40 ++++++++---
 .../apache/ignite/internal/index/IndexBuilder.java | 35 ++++++++--
 .../internal/index/IndexBuildingManager.java       |  3 +
 .../index/IndexAvailabilityControllerTest.java     | 15 ++++
 .../internal/index/IndexBuildControllerTest.java   | 48 +++++++++++--
 .../ignite/internal/index/IndexBuilderTest.java    | 61 ++++++++++++++--
 .../partition/replicator/fixtures/Node.java        |  1 +
 .../PartitionReplicaLifecycleManager.java          |  9 ++-
 .../replicator/ReplicaTableProcessor.java          |  9 +++
 .../partition/replicator/ReplicaTableSegment.java  | 46 ++++++++++++
 ...ocessor.java => TableTxRwOperationTracker.java} | 19 ++---
 .../replicator/ZonePartitionReplicaListener.java   |  6 ++
 .../org/apache/ignite/internal/app/IgniteImpl.java |  1 +
 .../table/distributed/index/IndexMeta.java         |  2 +-
 .../IndexBuilderTxRwOperationTracker.java          |  4 +-
 .../replicator/PartitionReplicaListener.java       | 17 +++--
 .../handlers/BuildIndexReplicaRequestHandler.java  | 27 +-------
 .../replication/PartitionReplicaListenerTest.java  | 81 +---------------------
 20 files changed, 344 insertions(+), 150 deletions(-)

diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexBuildCompletenessTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexBuildCompletenessTest.java
index 9080c90d0af..80f5dfce77a 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexBuildCompletenessTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexBuildCompletenessTest.java
@@ -43,7 +43,6 @@ import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.tx.Transaction;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 class ItIndexBuildCompletenessTest extends ClusterPerTestIntegrationTest {
@@ -82,11 +81,10 @@ class ItIndexBuildCompletenessTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27349";)
     void 
raceBetweenIndexBuildAndWriteFromDeadCoordinatorDoesNotCauseIndexIncompleteness()
 {
         createTestTable(cluster, 1, 1);
 
-        List<Transaction> transactions = IntStream.range(0, 2)
+        List<Transaction> transactions = IntStream.range(0, 100)
                 .mapToObj(n -> cluster.node(0).transactions().begin())
                 .collect(toUnmodifiableList());
 
@@ -100,7 +98,7 @@ class ItIndexBuildCompletenessTest extends 
ClusterPerTestIntegrationTest {
         for (int i = 0; i < transactions.size(); i++) {
             Transaction tx = transactions.get(i);
             try {
-                kvView.put(tx, i, 11);
+                kvView.put(tx, i, 42);
                 tx.commit();
             } catch (RuntimeException e) {
                 if (ExceptionUtils.hasCause(e, 
StaleTransactionOperationException.class)) {
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index e919d9457bd..14975611a9f 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -59,6 +59,10 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
+import org.apache.ignite.internal.partition.replicator.ReplicaTableSegment;
+import 
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
+import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import 
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -72,6 +76,7 @@ import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Component is responsible for starting and stopping the building of indexes 
on primary replicas.
@@ -103,6 +108,8 @@ class IndexBuildController implements ManuallyCloseable {
 
     private final ClockService clockService;
 
+    private final PartitionReplicaLifecycleManager 
partitionReplicaLifecycleManager;
+
     private final FailureProcessor failureProcessor;
 
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -119,6 +126,7 @@ class IndexBuildController implements ManuallyCloseable {
             ClusterService clusterService,
             PlacementDriver placementDriver,
             ClockService clockService,
+            PartitionReplicaLifecycleManager partitionReplicaLifecycleManager,
             FailureProcessor failureProcessor
     ) {
         this.indexBuilder = indexBuilder;
@@ -127,6 +135,7 @@ class IndexBuildController implements ManuallyCloseable {
         this.clusterService = clusterService;
         this.placementDriver = placementDriver;
         this.clockService = clockService;
+        this.partitionReplicaLifecycleManager = 
partitionReplicaLifecycleManager;
         this.failureProcessor = failureProcessor;
     }
 
@@ -409,6 +418,18 @@ class IndexBuildController implements ManuallyCloseable {
             long enlistmentConsistencyToken,
             HybridTimestamp initialOperationTimestamp
     ) {
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 
partitionId);
+        ZonePartitionResources resources = 
partitionReplicaLifecycleManager.zonePartitionResourcesOrNull(zonePartitionId);
+        if (resources == null) {
+            // Already stopped/destroyed, ignore.
+            return;
+        }
+
+        @Nullable ReplicaTableSegment segment = 
replicaTableSegment(zonePartitionId, tableId, resources, indexDescriptor);
+        if (segment == null) {
+            return;
+        }
+
         MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage, 
zoneId, tableId, partitionId);
 
         IndexStorage indexStorage = indexStorage(mvTableStorage, partitionId, 
indexDescriptor);
@@ -420,6 +441,8 @@ class IndexBuildController implements ManuallyCloseable {
                 indexDescriptor.id(),
                 indexStorage,
                 mvPartition,
+                segment.txRwOperationTracker(),
+                segment.safeTime(),
                 localNode(),
                 enlistmentConsistencyToken,
                 initialOperationTimestamp
@@ -435,6 +458,18 @@ class IndexBuildController implements ManuallyCloseable {
             MvTableStorage mvTableStorage,
             long enlistmentConsistencyToken
     ) {
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 
partitionId);
+        ZonePartitionResources resources = 
partitionReplicaLifecycleManager.zonePartitionResourcesOrNull(zonePartitionId);
+        if (resources == null) {
+            // Already stopped/destroyed, ignore.
+            return;
+        }
+
+        @Nullable ReplicaTableSegment segment = 
replicaTableSegment(zonePartitionId, tableId, resources, indexDescriptor);
+        if (segment == null) {
+            return;
+        }
+
         MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage, 
zoneId, tableId, partitionId);
 
         IndexStorage indexStorage = indexStorage(mvTableStorage, partitionId, 
indexDescriptor);
@@ -446,12 +481,41 @@ class IndexBuildController implements ManuallyCloseable {
                 indexDescriptor.id(),
                 indexStorage,
                 mvPartition,
+                segment.txRwOperationTracker(),
+                segment.safeTime(),
                 localNode(),
                 enlistmentConsistencyToken,
                 clockService.current()
         );
     }
 
+    private static @Nullable ReplicaTableSegment replicaTableSegment(
+            ZonePartitionId zonePartitionId,
+            int tableId,
+            ZonePartitionResources resources,
+            CatalogIndexDescriptor indexDescriptor
+    ) {
+        CompletableFuture<ZonePartitionReplicaListener> replicaListenerFuture 
= resources.replicaListenerFuture();
+        assert replicaListenerFuture.isDone() : "Replica listener future is 
not done for [zonePartitionId=" + zonePartitionId + "].";
+
+        ZonePartitionReplicaListener replicaListener = 
replicaListenerFuture.join();
+        @Nullable ReplicaTableSegment segment = 
replicaListener.segmentFor(tableId);
+
+        if (segment == null) {
+            // Null means that the table has been removed due to table 
destruction.
+            LOG.info(
+                    "Segment is null, skipping index build scheduling "
+                            + "[zoneId={}, tableId={}, partitionId={}, 
indexId={}]",
+                    zonePartitionId.zoneId(),
+                    tableId,
+                    zonePartitionId.partitionId(),
+                    indexDescriptor.id()
+            );
+        }
+
+        return segment;
+    }
+
     private InternalClusterNode localNode() {
         return IndexManagementUtils.localNode(clusterService);
     }
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index 4cc94adabdf..0b64a024256 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.index;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.Collectors.toUnmodifiableSet;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
 import org.apache.ignite.internal.raft.GroupOverloadedException;
@@ -62,10 +63,12 @@ import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.RowMeta;
 import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
 import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.TrackerClosedException;
 import org.jetbrains.annotations.Nullable;
 
@@ -80,7 +83,9 @@ class IndexBuildTask {
 
     private final IndexBuildTaskId taskId;
 
-    private final HybridTimestamp indexCreationActivationTs;
+    private final MetaIndexStatusChange indexCreationInfo;
+
+    private final HybridTimestamp indexBuildingStateActivationTimestamp;
 
     private final IndexStorage indexStorage;
 
@@ -88,6 +93,10 @@ class IndexBuildTask {
 
     private final ReplicaService replicaService;
 
+    private final TableTxRwOperationTracker txRwOperationTracker;
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime;
+
     private final FailureProcessor failureProcessor;
 
     private final FinalTransactionStateResolver finalTransactionStateResolver;
@@ -120,10 +129,13 @@ class IndexBuildTask {
 
     IndexBuildTask(
             IndexBuildTaskId taskId,
-            HybridTimestamp indexCreationActivationTs,
+            MetaIndexStatusChange indexCreationInfo,
+            HybridTimestamp indexBuildingStateActivationTimestamp,
             IndexStorage indexStorage,
             MvPartitionStorage partitionStorage,
             ReplicaService replicaService,
+            TableTxRwOperationTracker txRwOperationTracker,
+            PendingComparableValuesTracker<HybridTimestamp, Void> safeTime,
             FailureProcessor failureProcessor,
             FinalTransactionStateResolver finalTransactionStateResolver,
             Executor executor,
@@ -137,10 +149,13 @@ class IndexBuildTask {
             IndexBuilderMetricSource indexBuilderMetricSource
     ) {
         this.taskId = taskId;
-        this.indexCreationActivationTs = indexCreationActivationTs;
+        this.indexCreationInfo = indexCreationInfo;
+        this.indexBuildingStateActivationTimestamp = 
indexBuildingStateActivationTimestamp;
         this.indexStorage = indexStorage;
         this.partitionStorage = partitionStorage;
         this.replicaService = replicaService;
+        this.txRwOperationTracker = txRwOperationTracker;
+        this.safeTime = safeTime;
         this.failureProcessor = failureProcessor;
         this.finalTransactionStateResolver = finalTransactionStateResolver;
         this.executor = executor;
@@ -174,9 +189,16 @@ class IndexBuildTask {
         }
 
         try {
-            statisticsLoggingListener.onIndexBuildStarted();
-
-            supplyAsync(partitionStorage::highestRowId, executor)
+            // Before starting to build the index, we are waiting for all 
operations of RW transactions that started before index creation
+            // to make sure that, even if some coordinator has gone while we 
were waiting for its pre-index RW transactions to finish,
+            // we still allow operations of those transactions from that 
coordinator which are still in-flight to finish, so that we
+            // index the row versions they could create. Otherwise, we might 
miss some row versions in the index.
+            
txRwOperationTracker.awaitCompleteTxRwOperations(indexCreationInfo.catalogVersion())
+                    // This wait is necessary to make sure that all writes 
made before the index has switched to the BUILDING state
+                    // are visible to the index build process.
+                    .thenCompose(unused -> 
safeTime.waitFor(indexBuildingStateActivationTimestamp))
+                    .thenRun(statisticsLoggingListener::onIndexBuildStarted)
+                    .thenApplyAsync(unused -> partitionStorage.highestRowId(), 
executor)
                     .thenApplyAsync(this::handleNextBatch, executor)
                     .thenCompose(Function.identity())
                     .whenComplete((unused, throwable) -> {
@@ -296,7 +318,9 @@ class IndexBuildTask {
                 assert transactionId != null;
 
                 // We only care about transactions which began after index 
creation.
-                if 
(TransactionIds.beginTimestamp(transactionId).compareTo(indexCreationActivationTs)
 < 0) {
+                HybridTimestamp txBeginTs = 
TransactionIds.beginTimestamp(transactionId);
+                HybridTimestamp indexCreationTs = 
hybridTimestamp(indexCreationInfo.activationTimestamp());
+                if (txBeginTs.compareTo(indexCreationTs) < 0) {
                     transactionsToResolve.put(
                             row.transactionId(),
                             new CommitPartitionId(row.commitZoneId(), 
row.commitPartitionId())
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index 0049a34c440..1d3e8a4556d 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.index;
 
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
 
 import java.util.Iterator;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -43,7 +45,9 @@ import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.table.distributed.index.IndexMeta;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import org.apache.ignite.internal.table.distributed.index.MetaIndexStatus;
+import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 
 /**
  * Component that is responsible for building an index for a specific 
partition.
@@ -124,6 +128,8 @@ class IndexBuilder implements ManuallyCloseable {
      * @param indexId Index ID.
      * @param indexStorage Index storage to build.
      * @param partitionStorage Multi-versioned partition storage.
+     * @param partitionTxRwOperationTracker Partition transaction read-write 
operations tracker.
+     * @param partitionSafeTime Partition safe time tracker.
      * @param node Node to which requests to build the index will be sent.
      * @param enlistmentConsistencyToken Enlistment consistency token is used 
to check that the lease is still actual while the message goes
      *      to the replica.
@@ -135,6 +141,8 @@ class IndexBuilder implements ManuallyCloseable {
             int indexId,
             IndexStorage indexStorage,
             MvPartitionStorage partitionStorage,
+            TableTxRwOperationTracker partitionTxRwOperationTracker,
+            PendingComparableValuesTracker<HybridTimestamp, Void> 
partitionSafeTime,
             InternalClusterNode node,
             long enlistmentConsistencyToken,
             HybridTimestamp initialOperationTimestamp
@@ -152,10 +160,13 @@ class IndexBuilder implements ManuallyCloseable {
 
             IndexBuildTask newTask = new IndexBuildTask(
                     taskId,
-                    indexCreationActivationTs(indexId),
+                    indexCreationInfo(indexId),
+                    indexBuildingStateActivationTimestamp(indexId),
                     indexStorage,
                     partitionStorage,
                     replicaService,
+                    partitionTxRwOperationTracker,
+                    partitionSafeTime,
                     failureProcessor,
                     finalTransactionStateResolver,
                     executor,
@@ -193,6 +204,8 @@ class IndexBuilder implements ManuallyCloseable {
      * @param indexId Index ID.
      * @param indexStorage Index storage to build.
      * @param partitionStorage Multi-versioned partition storage.
+     * @param partitionTxRwOperationTracker Partition transaction read-write 
operations tracker.
+     * @param partitionSafeTime Partition safe time tracker.
      * @param node Node to which requests to build the index will be sent.
      * @param enlistmentConsistencyToken Enlistment consistency token is used 
to check that the lease is still actual while the
      *         message goes to the replica.
@@ -204,6 +217,8 @@ class IndexBuilder implements ManuallyCloseable {
             int indexId,
             IndexStorage indexStorage,
             MvPartitionStorage partitionStorage,
+            TableTxRwOperationTracker partitionTxRwOperationTracker,
+            PendingComparableValuesTracker<HybridTimestamp, Void> 
partitionSafeTime,
             InternalClusterNode node,
             long enlistmentConsistencyToken,
             HybridTimestamp initialOperationTimestamp
@@ -217,10 +232,13 @@ class IndexBuilder implements ManuallyCloseable {
 
             IndexBuildTask newTask = new IndexBuildTask(
                     taskId,
-                    indexCreationActivationTs(indexId),
+                    indexCreationInfo(indexId),
+                    indexBuildingStateActivationTimestamp(indexId),
                     indexStorage,
                     partitionStorage,
                     replicaService,
+                    partitionTxRwOperationTracker,
+                    partitionSafeTime,
                     failureProcessor,
                     finalTransactionStateResolver,
                     executor,
@@ -238,12 +256,19 @@ class IndexBuilder implements ManuallyCloseable {
         });
     }
 
-    private HybridTimestamp indexCreationActivationTs(int indexId) {
+    private MetaIndexStatusChange indexCreationInfo(int indexId) {
+        return 
requiredIndexMeta(indexId).statusChange(MetaIndexStatus.REGISTERED);
+    }
+
+    private IndexMeta requiredIndexMeta(int indexId) {
         IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
         assert indexMeta != null : "Index meta must be present for indexId=" + 
indexId;
+        return indexMeta;
+    }
 
-        long tsLong = 
indexMeta.statusChange(MetaIndexStatus.REGISTERED).activationTimestamp();
-        return HybridTimestamp.hybridTimestamp(tsLong);
+    private HybridTimestamp indexBuildingStateActivationTimestamp(int indexId) 
{
+        MetaIndexStatusChange statusChange = 
requiredIndexMeta(indexId).statusChange(MetaIndexStatus.BUILDING);
+        return hybridTimestamp(statusChange.activationTimestamp());
     }
 
     /**
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
index 237d88b8d14..58dd64b9a35 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.ClusterService;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import 
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -95,6 +96,7 @@ public class IndexBuildingManager implements IgniteComponent {
             FailureProcessor failureProcessor,
             LowWatermark lowWatermark,
             TxManager txManager,
+            PartitionReplicaLifecycleManager partitionReplicaLifecycleManager,
             MetricManager metricManager
     ) {
         this.metaStorageManager = metaStorageManager;
@@ -139,6 +141,7 @@ public class IndexBuildingManager implements 
IgniteComponent {
                 clusterService,
                 placementDriver,
                 clockService,
+                partitionReplicaLifecycleManager,
                 failureProcessor
         );
 
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index 1cc0eb0b67a..45fdab88ae3 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -39,6 +39,8 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -53,6 +55,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -60,6 +63,7 @@ import 
org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import org.apache.ignite.internal.metrics.TestMetricManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.sql.SqlCommon;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -68,6 +72,7 @@ import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -89,6 +94,10 @@ public class IndexAvailabilityControllerTest extends 
BaseIgniteAbstractTest {
 
     private final IndexMetaStorage indexMetaStorage = 
mock(IndexMetaStorage.class);
 
+    private final TableTxRwOperationTracker txRwOperationTracker = 
mock(TableTxRwOperationTracker.class);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime = mock(PendingComparableValuesTracker.class);
+
     private final IndexBuilder indexBuilder = new IndexBuilder(
             executorService,
             mock(ReplicaService.class, invocation -> nullCompletedFuture()),
@@ -108,6 +117,10 @@ public class IndexAvailabilityControllerTest extends 
BaseIgniteAbstractTest {
     @BeforeEach
     void configureMocks() {
         IndexMetaStorageMocks.configureMocksForBuildingPhase(indexMetaStorage);
+
+        
when(txRwOperationTracker.awaitCompleteTxRwOperations(anyInt())).thenReturn(nullCompletedFuture());
+
+        when(safeTime.waitFor(any())).thenReturn(nullCompletedFuture());
     }
 
     @BeforeEach
@@ -423,6 +436,8 @@ public class IndexAvailabilityControllerTest extends 
BaseIgniteAbstractTest {
                 indexId,
                 indexStorage,
                 mock(MvPartitionStorage.class),
+                txRwOperationTracker,
+                safeTime,
                 mock(InternalClusterNode.class),
                 ANY_ENLISTMENT_CONSISTENCY_TOKEN,
                 clock.current()
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index 42624bdd6a3..74c702ad005 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -41,6 +41,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -61,6 +62,11 @@ import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
+import org.apache.ignite.internal.partition.replicator.ReplicaTableSegment;
+import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
+import 
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
+import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.leases.Lease;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -70,11 +76,16 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 /** For {@link IndexBuildController} testing. */
+@ExtendWith(MockitoExtension.class)
 public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
     private static final int PARTITION_ID = 10;
 
@@ -90,13 +101,11 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
 
     private final ClockService clockService = new TestClockService(clock);
 
-    private IndexManager indexManager = null;
-
     @BeforeEach
-    void setUp() {
+    void setUp(@Mock PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime) {
         indexBuilder = mock(IndexBuilder.class);
 
-        indexManager = mock(IndexManager.class, invocation -> {
+        IndexManager indexManager = mock(IndexManager.class, invocation -> {
             MvTableStorage mvTableStorage = mock(MvTableStorage.class);
             MvPartitionStorage mvPartitionStorage = 
mock(MvPartitionStorage.class);
             IndexStorage indexStorage = mock(IndexStorage.class);
@@ -112,6 +121,16 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
         catalogManager = createCatalogManagerWithTestUpdateLog(NODE_NAME, 
clock);
         assertThat(catalogManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
+        PartitionReplicaLifecycleManager partitionReplicaLifecycleManager = 
mock(PartitionReplicaLifecycleManager.class);
+        ZonePartitionResources zonePartitionResources = 
mock(ZonePartitionResources.class);
+        
lenient().doReturn(zonePartitionResources).when(partitionReplicaLifecycleManager).zonePartitionResourcesOrNull(any());
+
+        ZonePartitionReplicaListener replicaListener = 
mock(ZonePartitionReplicaListener.class);
+        
lenient().doReturn(completedFuture(replicaListener)).when(zonePartitionResources).replicaListenerFuture();
+
+        TableTxRwOperationTracker txRwOperationTracker = 
mock(TableTxRwOperationTracker.class);
+        lenient().doReturn(new ReplicaTableSegment(txRwOperationTracker, 
safeTime)).when(replicaListener).segmentFor(anyInt());
+
         indexBuildController = new IndexBuildController(
                 indexBuilder,
                 indexManager,
@@ -119,6 +138,7 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 clusterService,
                 placementDriver,
                 clockService,
+                partitionReplicaLifecycleManager,
                 new NoOpFailureManager()
         );
 
@@ -151,6 +171,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 eq(indexId(INDEX_NAME)),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
@@ -163,6 +185,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 eq(indexId(INDEX_NAME)),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
@@ -186,6 +210,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 eq(indexId(INDEX_NAME)),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
@@ -198,6 +224,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 eq(indexId(INDEX_NAME)),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
@@ -219,6 +247,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 eq(indexId(INDEX_NAME)),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
@@ -231,6 +261,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 eq(indexId(PK_INDEX_NAME)),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
@@ -252,6 +284,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 eq(indexId(pkIndexName(tableName))),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
@@ -264,6 +298,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 eq(indexId(pkIndexName(tableName))),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
@@ -308,6 +344,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 anyInt(),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
@@ -320,6 +358,8 @@ public class IndexBuildControllerTest extends 
BaseIgniteAbstractTest {
                 eq(indexId0),
                 any(),
                 any(),
+                any(),
+                any(),
                 eq(LOCAL_NODE),
                 anyLong(),
                 any()
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
index 91157b0ded0..8926fa2f9d2 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.index;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -29,7 +31,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -37,14 +43,15 @@ import static org.mockito.Mockito.when;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metrics.TestMetricManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -53,11 +60,16 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import org.apache.ignite.internal.table.distributed.index.MetaIndexStatus;
+import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
 
 /** For {@link IndexBuilder} testing. */
 public class IndexBuilderTest extends BaseIgniteAbstractTest {
@@ -77,6 +89,12 @@ public class IndexBuilderTest extends BaseIgniteAbstractTest 
{
 
     private final IndexMetaStorage indexMetaStorage = 
mock(IndexMetaStorage.class);
 
+    private final MvPartitionStorage mvPartitionStorage = 
mock(MvPartitionStorage.class);
+
+    private final TableTxRwOperationTracker txRwOperationTracker = 
mock(TableTxRwOperationTracker.class);
+
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime = mock(PendingComparableValuesTracker.class);
+
     private final TestMetricManager metricManager = new TestMetricManager();
 
     private final IndexBuilder indexBuilder = new IndexBuilder(
@@ -91,14 +109,45 @@ public class IndexBuilderTest extends 
BaseIgniteAbstractTest {
     @BeforeEach
     void configureMocks() {
         IndexMetaStorageMocks.configureMocksForBuildingPhase(indexMetaStorage);
+
+        
when(txRwOperationTracker.awaitCompleteTxRwOperations(anyInt())).thenReturn(nullCompletedFuture());
+
+        when(safeTime.waitFor(any())).thenReturn(nullCompletedFuture());
     }
 
     @AfterEach
     void tearDown() throws Exception {
         closeAll(
                 indexBuilder::close,
-                () -> shutdownAndAwaitTermination(executorService, 1, 
TimeUnit.SECONDS)
+                () -> shutdownAndAwaitTermination(executorService, 1, SECONDS)
+        );
+    }
+
+    @Test
+    void testIndexBuildInvokesNecessaryWaitsBeforeStartingToBuild() {
+        int registerredStateCatalogVersion = 10;
+        long buildingStateActivationTs = 2000L;
+
+        IndexMeta indexMeta = new IndexMeta(
+                100,
+                INDEX_ID,
+                TABLE_ID,
+                1,
+                "idx",
+                MetaIndexStatus.BUILDING,
+                Map.of(
+                        MetaIndexStatus.REGISTERED, new 
MetaIndexStatusChange(registerredStateCatalogVersion, 1000L),
+                        MetaIndexStatus.BUILDING, new 
MetaIndexStatusChange(20, buildingStateActivationTs)
+                )
         );
+        doReturn(indexMeta).when(indexMetaStorage).indexMeta(INDEX_ID);
+
+        scheduleBuildIndex(INDEX_ID, ZONE_ID, TABLE_ID, PARTITION_ID, 
List.of(rowId(PARTITION_ID)));
+
+        InOrder inOrder = inOrder(txRwOperationTracker, safeTime, 
mvPartitionStorage);
+        inOrder.verify(txRwOperationTracker, 
timeout(SECONDS.toMillis(10))).awaitCompleteTxRwOperations(registerredStateCatalogVersion);
+        inOrder.verify(safeTime, 
timeout(SECONDS.toMillis(10))).waitFor(hybridTimestamp(buildingStateActivationTs));
+        inOrder.verify(mvPartitionStorage, 
timeout(SECONDS.toMillis(10))).highestRowId();
     }
 
     @Test
@@ -193,7 +242,9 @@ public class IndexBuilderTest extends 
BaseIgniteAbstractTest {
                 partitionId,
                 indexId,
                 indexStorage(nextRowIdsToBuild),
-                mock(MvPartitionStorage.class),
+                mvPartitionStorage,
+                txRwOperationTracker,
+                safeTime,
                 mock(InternalClusterNode.class),
                 ANY_ENLISTMENT_CONSISTENCY_TOKEN,
                 mock(HybridTimestamp.class)
@@ -213,7 +264,9 @@ public class IndexBuilderTest extends 
BaseIgniteAbstractTest {
                 partitionId,
                 indexId,
                 indexStorage(nextRowIdsToBuild),
-                mock(MvPartitionStorage.class),
+                mvPartitionStorage,
+                txRwOperationTracker,
+                safeTime,
                 mock(InternalClusterNode.class),
                 ANY_ENLISTMENT_CONSISTENCY_TOKEN,
                 mock(HybridTimestamp.class)
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index c1c4a03a9d0..d18eae051cd 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -847,6 +847,7 @@ public class Node {
                 failureManager,
                 lowWatermark,
                 txManager,
+                partitionReplicaLifecycleManager,
                 metricManager
         );
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index eecdf47a4c9..a423ffea41e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -1973,13 +1973,20 @@ public class PartitionReplicaLifecycleManager extends
      * Returns resources for the given zone partition.
      */
     public ZonePartitionResources zonePartitionResources(ZonePartitionId 
zonePartitionId) {
-        ZonePartitionResources resources = 
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+        ZonePartitionResources resources = 
zonePartitionResourcesOrNull(zonePartitionId);
 
         assert resources != null : String.format("Missing resources for zone 
partition [zonePartitionId=%s]", zonePartitionId);
 
         return resources;
     }
 
+    /**
+     * Returns resources for the given zone partition or {@code null} if not 
available (because the replica is stopped/destroyed).
+     */
+    public @Nullable ZonePartitionResources 
zonePartitionResourcesOrNull(ZonePartitionId zonePartitionId) {
+        return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+    }
+
     /**
      * For HA zones: Check that last rebalance was graceful (caused by common 
rebalance triggers, like data nodes change, replica factor
      * change, etc.) rather than forced (caused by a disaster recovery reset 
after losing the majority of nodes).
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
index 3d9f8044010..a8e2e9cb698 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.partition.replicator;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 
 /**
  * Processor of replica requests targeted at a particular table.
@@ -38,4 +40,11 @@ public interface ReplicaTableProcessor {
 
     /** Callback on replica shutdown. */
     void onShutdown();
+
+    /** Returns tracker of RW transactions operations. */
+    TableTxRwOperationTracker txRwOperationTracker();
+
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-27405 as safe time 
should not depend on the table.
+    /** Returns safe time tracker for the partition. */
+    PendingComparableValuesTracker<HybridTimestamp, Void> safeTime();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableSegment.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableSegment.java
new file mode 100644
index 00000000000..462210dd989
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableSegment.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+
+/**
+ * Segment of a partition replica corresponding to a specific table.
+ */
+// TODO: https://issues.apache.org/jira/browse/IGNITE-27405 - remove this as 
we'll not need to pass safe time tracker.
+public class ReplicaTableSegment {
+    private final TableTxRwOperationTracker txRwOperationTracker;
+    private final PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime;
+
+    public ReplicaTableSegment(
+            TableTxRwOperationTracker txRwOperationTracker,
+            PendingComparableValuesTracker<HybridTimestamp, Void> safeTime
+    ) {
+        this.txRwOperationTracker = txRwOperationTracker;
+        this.safeTime = safeTime;
+    }
+
+    public TableTxRwOperationTracker txRwOperationTracker() {
+        return txRwOperationTracker;
+    }
+
+    public PendingComparableValuesTracker<HybridTimestamp, Void> safeTime() {
+        return safeTime;
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableTxRwOperationTracker.java
similarity index 57%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
copy to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableTxRwOperationTracker.java
index 3d9f8044010..eeb5bf99d3d 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTableProcessor.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TableTxRwOperationTracker.java
@@ -17,25 +17,16 @@
 
 package org.apache.ignite.internal.partition.replicator;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.replicator.ReplicaResult;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 
 /**
- * Processor of replica requests targeted at a particular table.
+ * Tracks the completion of RW transactions' operations before a dependent 
process begins.
  */
-public interface ReplicaTableProcessor {
+public interface TableTxRwOperationTracker {
     /**
-     * Processes replica request.
+     * Waits for RW transactions operations to complete strictly lower than 
the requested catalog version.
      *
-     * @param request Replica request.
-     * @param replicaPrimacy Replica primacy info.
-     * @param senderId ID of the node that sent the request.
-     * @return Future completed with the result of processing.
+     * @param catalogVersion Catalog version in question.
      */
-    CompletableFuture<ReplicaResult> process(ReplicaRequest request, 
ReplicaPrimacy replicaPrimacy, UUID senderId);
-
-    /** Callback on replica shutdown. */
-    void onShutdown();
+    CompletableFuture<Void> awaitCompleteTxRwOperations(int catalogVersion);
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 0c2a08f1b87..cc0461378be 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.VisibleForTesting;
 
 /**
@@ -317,6 +318,11 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
         replicaProcessors.remove(tableId);
     }
 
+    public @Nullable ReplicaTableSegment segmentFor(int tableId) {
+        ReplicaTableProcessor processor = replicaProcessors.get(tableId);
+        return processor == null ? null : new 
ReplicaTableSegment(processor.txRwOperationTracker(), processor.safeTime());
+    }
+
     /**
      * Return table replicas listeners.
      *
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 89a288788c1..959cbe912f9 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1200,6 +1200,7 @@ public class IgniteImpl implements Ignite {
                 failureManager,
                 lowWatermark,
                 txManager,
+                partitionReplicaLifecycleManager,
                 metricManager
         );
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMeta.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMeta.java
index 732c2b33fb4..101410750f7 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMeta.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMeta.java
@@ -62,7 +62,7 @@ public class IndexMeta {
      * @param currentStatus Current status of the index
      * @param statusChanges <b>Immutable</b> map of index statuses with change 
info (for example catalog version) in which they appeared.
      */
-    IndexMeta(
+    public IndexMeta(
             int catalogVersion,
             int indexId,
             int tableId,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IndexBuilderTxRwOperationTracker.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IndexBuilderTxRwOperationTracker.java
index 4613c54275b..cd2fc606321 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IndexBuilderTxRwOperationTracker.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IndexBuilderTxRwOperationTracker.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.lang.NodeStoppingException;
+import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 
 /**
@@ -45,7 +46,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
  *     <li>When completing a RW transaction operation, {@link 
#decrementOperationCount(int)} must be used.</li>
  * </ul>
  */
-public class IndexBuilderTxRwOperationTracker implements ManuallyCloseable {
+public class IndexBuilderTxRwOperationTracker implements 
TableTxRwOperationTracker, ManuallyCloseable {
     private final AtomicInteger minAllowedCatalogVersionForStartOperation = 
new AtomicInteger(-1);
 
     private final NavigableMap<Integer, CompletableFuture<Void>> 
minAllowedVersionRaiseFutures = new ConcurrentSkipListMap<>();
@@ -100,6 +101,7 @@ public class IndexBuilderTxRwOperationTracker implements 
ManuallyCloseable {
      *
      * @param catalogVersion Catalog version in which the new index appeared.
      */
+    @Override
     public CompletableFuture<Void> awaitCompleteTxRwOperations(int 
catalogVersion) {
         return inBusyLock(busyLock, () -> {
             // This code is needed to avoid races with 
updateAllowedCatalogVersionForStartOperation.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 704d3a00558..71e7eaeff54 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -102,6 +102,7 @@ import 
org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
 import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
 import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
 import 
org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
+import 
org.apache.ignite.internal.partition.replicator.TableTxRwOperationTracker;
 import 
org.apache.ignite.internal.partition.replicator.exception.OperationLockException;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
@@ -413,11 +414,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
         raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
 
-        buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler(
-                indexMetaStorage,
-                indexBuildingProcessor.tracker(),
-                safeTime,
-                raftCommandApplicator);
+        buildIndexReplicaRequestHandler = new 
BuildIndexReplicaRequestHandler(indexMetaStorage, raftCommandApplicator);
     }
 
     @Override
@@ -3436,6 +3433,16 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         indexBuildingProcessor.onShutdown();
     }
 
+    @Override
+    public TableTxRwOperationTracker txRwOperationTracker() {
+        return indexBuildingProcessor.tracker();
+    }
+
+    @Override
+    public PendingComparableValuesTracker<HybridTimestamp, Void> safeTime() {
+        return safeTime;
+    }
+
     private int partId() {
         return replicationGroupId.partitionId();
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
index 0926695a6c2..5476d275b8d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
@@ -17,13 +17,10 @@
 
 package org.apache.ignite.internal.table.distributed.replicator.handlers;
 
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.BUILDING;
-import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
@@ -31,8 +28,6 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.Build
 import org.apache.ignite.internal.table.distributed.index.IndexMeta;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
-import 
org.apache.ignite.internal.table.distributed.replicator.IndexBuilderTxRwOperationTracker;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 
 /**
  * Handler for {@link BuildIndexReplicaRequest}.
@@ -44,12 +39,6 @@ public class BuildIndexReplicaRequestHandler {
 
     private final IndexMetaStorage indexMetaStorage;
 
-    /** Read-write transaction operation tracker for building indexes. */
-    private final IndexBuilderTxRwOperationTracker txRwOperationTracker;
-
-    /** Partition safe-time tracker. */
-    private final PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime;
-
     /** Applicator that applies RAFT command that is created by this handler. 
*/
     private final ReplicationRaftCommandApplicator commandApplicator;
 
@@ -57,19 +46,10 @@ public class BuildIndexReplicaRequestHandler {
      * Creates a new instance of request handler.
      *
      * @param indexMetaStorage Index meta storage.
-     * @param txRwOperationTracker Read-write transaction operation tracker 
for building indexes.
-     * @param safeTime Partition safe-time tracker.
      * @param commandApplicator Applicator that applies RAFT command that is 
created by this handler.
      */
-    public BuildIndexReplicaRequestHandler(
-            IndexMetaStorage indexMetaStorage,
-            IndexBuilderTxRwOperationTracker txRwOperationTracker,
-            PendingComparableValuesTracker<HybridTimestamp, Void> safeTime,
-            ReplicationRaftCommandApplicator commandApplicator
-    ) {
+    public BuildIndexReplicaRequestHandler(IndexMetaStorage indexMetaStorage, 
ReplicationRaftCommandApplicator commandApplicator) {
         this.indexMetaStorage = indexMetaStorage;
-        this.txRwOperationTracker = txRwOperationTracker;
-        this.safeTime = safeTime;
         this.commandApplicator = commandApplicator;
     }
 
@@ -86,12 +66,9 @@ public class BuildIndexReplicaRequestHandler {
             return nullCompletedFuture();
         }
 
-        MetaIndexStatusChange registeredChangeInfo = 
indexMeta.statusChange(REGISTERED);
         MetaIndexStatusChange buildingChangeInfo = 
indexMeta.statusChange(BUILDING);
 
-        return 
txRwOperationTracker.awaitCompleteTxRwOperations(registeredChangeInfo.catalogVersion())
-                .thenCompose(unused -> 
safeTime.waitFor(hybridTimestamp(buildingChangeInfo.activationTimestamp())))
-                .thenCompose(unused -> 
commandApplicator.applyCommand(toBuildIndexCommand(request, 
buildingChangeInfo)));
+        return commandApplicator.applyCommand(toBuildIndexCommand(request, 
buildingChangeInfo));
     }
 
     private static BuildIndexCommand 
toBuildIndexCommand(BuildIndexReplicaRequest request, MetaIndexStatusChange 
buildingChangeInfo) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 95eabbe6712..dd436d029fe 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -57,7 +57,6 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
@@ -137,7 +136,6 @@ import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.SingleClusterNodeResolver;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
-import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.CatalogVersionAware;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
@@ -2586,11 +2584,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     @Test
-    void testBuildIndexReplicaRequestWithoutRwTxOperations() {
+    void testBuildIndexReplicaRequest() {
         int indexId = hashIndexStorage.id();
         int indexCreationCatalogVersion = 1;
         int startBuildingIndexCatalogVersion = 2;
-        long indexCreationActivationTs = 
clock.now().addPhysicalTime(-100).longValue();
+        long indexCreationActivationTs = 
clock.now().subtractPhysicalTime(100).longValue();
         long startBuildingIndexActivationTs = clock.nowLong();
 
         setIndexMetaInBuildingStatus(
@@ -2601,85 +2599,12 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 startBuildingIndexActivationTs
         );
 
-        CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = 
invokeBuildIndexReplicaRequestAsync(indexId);
-
-        assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
-
         fireHashIndexStartBuildingEventForStaleTxOperation(indexId, 
startBuildingIndexCatalogVersion);
 
-        assertThat(invokeBuildIndexReplicaRequestFuture, 
willCompleteSuccessfully());
-        assertThat(invokeBuildIndexReplicaRequestAsync(indexId), 
willCompleteSuccessfully());
-    }
-
-    @ParameterizedTest(name = "failCmd = {0}")
-    @ValueSource(booleans = {false, true})
-    void testBuildIndexReplicaRequest(boolean failCmd) {
-        var continueNotBuildIndexCmdFuture = new CompletableFuture<Void>();
-        var buildIndexCommandFuture = new 
CompletableFuture<BuildIndexCommand>();
-
-        when(mockRaftClient.run(any())).thenAnswer(invocation -> {
-            Command cmd = invocation.getArgument(0);
-
-            if (cmd instanceof BuildIndexCommand) {
-                buildIndexCommandFuture.complete((BuildIndexCommand) cmd);
-
-                return raftClientFutureClosure.apply(cmd);
-            }
-
-            return continueNotBuildIndexCmdFuture.thenCompose(unused -> 
raftClientFutureClosure.apply(cmd));
-        });
-
-        UUID txId = newTxId();
-        long beginTs = beginTimestamp(txId).longValue();
-
-        when(catalogService.activeCatalogVersion(eq(beginTs))).thenReturn(0);
-
-        BinaryRow row = binaryRow(0);
-
-        CompletableFuture<ReplicaResult> upsertFuture = upsertAsync(txId, row, 
true);
-
-        int indexId = hashIndexStorage.id();
-        int indexCreationCatalogVersion = 1;
-        int startBuildingIndexCatalogVersion = 2;
-        long indexCreationActivationTs = 
clock.now().addPhysicalTime(-100).longValue();
-        long startBuildingIndexActivationTs = clock.nowLong();
-
-        setIndexMetaInBuildingStatus(
-                indexId,
-                indexCreationCatalogVersion,
-                indexCreationActivationTs,
-                startBuildingIndexCatalogVersion,
-                startBuildingIndexActivationTs
-        );
-
         CompletableFuture<?> invokeBuildIndexReplicaRequestFuture = 
invokeBuildIndexReplicaRequestAsync(indexId);
 
-        fireHashIndexStartBuildingEventForStaleTxOperation(indexId, 
startBuildingIndexCatalogVersion);
-
-        assertFalse(upsertFuture.isDone());
-        assertFalse(invokeBuildIndexReplicaRequestFuture.isDone());
-
-        if (failCmd) {
-            continueNotBuildIndexCmdFuture.completeExceptionally(new 
RuntimeException("error from test"));
-
-            assertThat(upsertFuture, willThrow(RuntimeException.class));
-        } else {
-            continueNotBuildIndexCmdFuture.complete(null);
-
-            assertThat(upsertFuture, willCompleteSuccessfully());
-        }
-
         assertThat(invokeBuildIndexReplicaRequestFuture, 
willCompleteSuccessfully());
-
-        HybridTimestamp startBuildingIndexActivationTs0 = 
hybridTimestamp(startBuildingIndexActivationTs);
-
-        verify(safeTimeClock).waitFor(eq(startBuildingIndexActivationTs0));
-
-        assertThat(buildIndexCommandFuture, willCompleteSuccessfully());
-
-        BuildIndexCommand buildIndexCommand = buildIndexCommandFuture.join();
-        assertThat(buildIndexCommand.indexId(), equalTo(indexId));
-        assertThat(buildIndexCommand.requiredCatalogVersion(), 
equalTo(startBuildingIndexCatalogVersion));
+        assertThat(invokeBuildIndexReplicaRequestAsync(indexId), 
willCompleteSuccessfully());
     }
 
     private void fireHashIndexStartBuildingEventForStaleTxOperation(int 
indexId, int startBuildingIndexCatalogVersion) {

Reply via email to