This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new fe4bdb0809c IGNITE-21546 Add write intent resolution to index backfill 
process (#6810)
fe4bdb0809c is described below

commit fe4bdb0809cd29ce96456d2d4bddfd9e7d4badc8
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 22 12:16:19 2025 +0400

    IGNITE-21546 Add write intent resolution to index backfill process (#6810)
---
 .../RaftCommandsUnitCompatibilityArchTest.java     |  13 ++-
 .../internal/index/ItBuildIndexOneNodeTest.java    |   2 -
 .../ignite/internal/index/ItBuildIndexTest.java    |   3 -
 .../index/FinalTransactionStateResolver.java       |  37 +++++++
 .../ignite/internal/index/IndexBuildTask.java      | 107 ++++++++++++++++++---
 .../apache/ignite/internal/index/IndexBuilder.java |  30 +++++-
 .../internal/index/IndexBuildingManager.java       |  25 ++++-
 .../RetryingFinalTransactionStateResolver.java     |  74 ++++++++++++++
 .../CommittedFinalTransactionStateResolver.java    |  33 +++++++
 .../index/IndexAvailabilityControllerTest.java     |  12 ++-
 .../ignite/internal/index/IndexBuilderTest.java    |  13 ++-
 .../internal/index/IndexMetaStorageMocks.java      |  43 +++++++++
 .../partition/replicator/fixtures/Node.java        |   3 +-
 .../network/PartitionReplicationMessageGroup.java  |   4 +
 .../BuildIndexCommandV3.java}                      |  29 ++----
 .../replication/BuildIndexReplicaRequest.java      |   4 +
 .../PartitionCommandsCompatibilityTest.java        |  25 +++++
 .../raft/BaseCommandsCompatibilityTest.java        |   4 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../internal/storage/MvPartitionStorage.java       |   8 ++
 .../apache/ignite/internal/storage/RowMeta.java    |  70 ++++++++++++++
 .../storage/ThreadAssertingMvPartitionStorage.java |   7 ++
 .../AbstractMvPartitionStorageConcurrencyTest.java |   3 +
 .../storage/AbstractMvPartitionStorageTest.java    |  42 ++++++++
 .../storage/AbstractMvTableStorageTest.java        |   2 +
 .../storage/impl/TestMvPartitionStorage.java       |  18 +++-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  32 +++++-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |  43 +++++++++
 .../distributed/index/MetaIndexStatusChange.java   |   2 +-
 .../table/distributed/raft/PartitionListener.java  |  15 ++-
 .../raft/handlers/BuildIndexCommandHandler.java    |  12 ++-
 .../raft/handlers/BuildIndexRowVersionChooser.java |  17 +++-
 .../replicator/TransactionStateResolver.java       |  69 +++++++++++--
 .../handlers/BuildIndexReplicaRequestHandler.java  |   5 +-
 .../handlers/BuildIndexRowVersionChooserTest.java  |  32 +++++-
 .../replication/PartitionReplicaListenerTest.java  |   1 +
 .../internal/tx/impl/PlacementDriverHelper.java    |  33 ++++++-
 37 files changed, 789 insertions(+), 86 deletions(-)

diff --git 
a/modules/arch-test/src/test/java/org/apache/ignite/internal/RaftCommandsUnitCompatibilityArchTest.java
 
b/modules/arch-test/src/test/java/org/apache/ignite/internal/RaftCommandsUnitCompatibilityArchTest.java
index 0f0f521e12d..46a874f3055 100644
--- 
a/modules/arch-test/src/test/java/org/apache/ignite/internal/RaftCommandsUnitCompatibilityArchTest.java
+++ 
b/modules/arch-test/src/test/java/org/apache/ignite/internal/RaftCommandsUnitCompatibilityArchTest.java
@@ -21,8 +21,8 @@ import static 
com.tngtech.archunit.core.importer.ImportOption.Predefined.DO_NOT_
 import static 
com.tngtech.archunit.core.importer.ImportOption.Predefined.DO_NOT_INCLUDE_TEST_FIXTURES;
 import static java.util.stream.Collectors.toSet;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 
 import com.tngtech.archunit.core.domain.JavaClass;
@@ -59,11 +59,14 @@ public class RaftCommandsUnitCompatibilityArchTest {
 
         Set<String> testedRaftCommands = collectTestedRaftCommands(classes);
 
+        Set<String> notTestedRaftCommands = raftCommands.stream()
+                .filter(cmd -> !testedRaftCommands.contains(cmd))
+                .collect(toSet());
+
         assertThat(
-                "There are still some raft commands that haven't been tested; 
for example, see the successors of "
-                        + BaseCommandsCompatibilityTest.class.getName(),
-                testedRaftCommands,
-                containsInAnyOrder(raftCommands.toArray(new String[0]))
+                "There are still some raft commands that haven't been tested",
+                notTestedRaftCommands,
+                is(empty())
         );
     }
 
diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
index 324eaa86622..6b38736040e 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
@@ -68,7 +68,6 @@ import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionOptions;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /** Integration test for testing the building of an index in a single node 
cluster. */
@@ -369,7 +368,6 @@ public class ItBuildIndexOneNodeTest extends 
BaseSqlIntegrationTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546";)
     void writeIntentFromAbortedTxShouldNotBeIndexed() throws Exception {
         createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
 
diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
index b7d7b0753f8..a29b96fa0ed 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
@@ -85,7 +85,6 @@ import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -224,7 +223,6 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546";)
     void writeIntentFromTxAbandonedBeforeShouldNotBeIndexed() throws Exception 
{
         createTable(1, 1);
 
@@ -246,7 +244,6 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546";)
     void 
writeIntentFromTxAbandonedWhileWaitingForTransactionsToFinishShouldNotBeIndexed()
 throws Exception {
         createTable(1, 1);
 
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/FinalTransactionStateResolver.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/FinalTransactionStateResolver.java
new file mode 100644
index 00000000000..cd98537b087
--- /dev/null
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/FinalTransactionStateResolver.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tx.TxState;
+
+/**
+ * Resolves the final state of a transaction.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface FinalTransactionStateResolver {
+    /**
+     * Resolves the final state of a transaction. Cannot return a non-final 
state.
+     *
+     * @param transactionId Transaction ID.
+     * @param commitGroupId Commit partition ID.
+     */
+    CompletableFuture<TxState> resolveFinalTxState(UUID transactionId, 
ReplicationGroupId commitGroupId);
+}
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index 799a5fd892f..cbbaee2b13a 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.index;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableSet;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -27,7 +29,12 @@ import static 
org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +52,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicat
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
 import org.apache.ignite.internal.raft.GroupOverloadedException;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
@@ -53,11 +61,15 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.RowMeta;
 import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.TrackerClosedException;
+import org.jetbrains.annotations.Nullable;
 
 /** Task of building a table index. */
 class IndexBuildTask {
@@ -70,6 +82,8 @@ class IndexBuildTask {
 
     private final IndexBuildTaskId taskId;
 
+    private final HybridTimestamp indexCreationActivationTs;
+
     private final IndexStorage indexStorage;
 
     private final MvPartitionStorage partitionStorage;
@@ -80,6 +94,8 @@ class IndexBuildTask {
 
     private final NodeProperties nodeProperties;
 
+    private final FinalTransactionStateResolver finalTransactionStateResolver;
+
     private final Executor executor;
 
     private final IgniteSpinBusyLock busyLock;
@@ -104,11 +120,13 @@ class IndexBuildTask {
 
     IndexBuildTask(
             IndexBuildTaskId taskId,
+            HybridTimestamp indexCreationActivationTs,
             IndexStorage indexStorage,
             MvPartitionStorage partitionStorage,
             ReplicaService replicaService,
             FailureProcessor failureProcessor,
             NodeProperties nodeProperties,
+            FinalTransactionStateResolver finalTransactionStateResolver,
             Executor executor,
             IgniteSpinBusyLock busyLock,
             int batchSize,
@@ -119,11 +137,13 @@ class IndexBuildTask {
             HybridTimestamp initialOperationTimestamp
     ) {
         this.taskId = taskId;
+        this.indexCreationActivationTs = indexCreationActivationTs;
         this.indexStorage = indexStorage;
         this.partitionStorage = partitionStorage;
         this.replicaService = replicaService;
         this.failureProcessor = failureProcessor;
         this.nodeProperties = nodeProperties;
+        this.finalTransactionStateResolver = finalTransactionStateResolver;
         this.executor = executor;
         this.busyLock = busyLock;
         this.batchSize = batchSize;
@@ -206,9 +226,10 @@ class IndexBuildTask {
         }
 
         try {
-            List<RowId> batchRowIds = createBatchRowIds();
-
-            return replicaService.invoke(node, 
createBuildIndexReplicaRequest(batchRowIds, initialOperationTimestamp))
+            return createBatchToIndex()
+                    .thenCompose(batch -> {
+                        return replicaService.invoke(node, 
createBuildIndexReplicaRequest(batch, initialOperationTimestamp));
+                    })
                     .handleAsync((unused, throwable) -> {
                         if (throwable != null) {
                             Throwable cause = unwrapRootCause(throwable);
@@ -236,27 +257,68 @@ class IndexBuildTask {
         }
     }
 
-    private List<RowId> createBatchRowIds() {
+    private CompletableFuture<BatchToIndex> createBatchToIndex() {
         RowId nextRowIdToBuild = indexStorage.getNextRowIdToBuild();
 
-        List<RowId> batch = new ArrayList<>(batchSize);
+        List<RowId> rowIds = new ArrayList<>(batchSize);
+        Map<UUID, CommitPartitionId> transactionsToResolve = new HashMap<>();
 
-        for (int i = 0; i < batchSize && nextRowIdToBuild != null; i++) {
-            nextRowIdToBuild = partitionStorage.closestRowId(nextRowIdToBuild);
+        while (rowIds.size() < batchSize && nextRowIdToBuild != null) {
+            @Nullable RowMeta row = 
partitionStorage.closestRow(nextRowIdToBuild);
 
-            if (nextRowIdToBuild == null) {
+            if (row == null) {
                 break;
             }
 
-            batch.add(nextRowIdToBuild);
+            rowIds.add(row.rowId());
+
+            if (row.isWriteIntent()) {
+                UUID transactionId = row.transactionId();
+                assert transactionId != null;
+
+                // We only care about transactions which began after index 
creation.
+                if 
(TransactionIds.beginTimestamp(transactionId).compareTo(indexCreationActivationTs)
 < 0) {
+                    transactionsToResolve.put(
+                            row.transactionId(),
+                            new CommitPartitionId(row.commitTableOrZoneId(), 
row.commitPartitionId())
+                    );
+                }
+            }
 
-            nextRowIdToBuild = nextRowIdToBuild.increment();
+            nextRowIdToBuild = row.rowId().increment();
         }
 
-        return batch;
+        Map<UUID, CompletableFuture<TxState>> txStateResolveFutures = 
transactionsToResolve.entrySet().stream()
+                .map(entry -> Map.entry(entry.getKey(), 
resolveFinalTxStateIfNeeded(entry.getKey(), entry.getValue())))
+                .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        return CompletableFutures.allOf(txStateResolveFutures.values())
+                .thenApply(unused -> {
+                    Set<UUID> abortedTransactionIds = 
txStateResolveFutures.entrySet().stream()
+                            .filter(entry -> entry.getValue().join() == 
TxState.ABORTED)
+                            .map(Entry::getKey)
+                            .collect(toUnmodifiableSet());
+
+                    return new BatchToIndex(rowIds, abortedTransactionIds);
+                });
+    }
+
+    private CompletableFuture<TxState> resolveFinalTxStateIfNeeded(UUID 
transactionId, CommitPartitionId commitPartitionId) {
+        assert commitPartitionId.commitTableOrZoneId != null;
+
+        ReplicationGroupId commitGroupId = 
targetGroupId(commitPartitionId.commitTableOrZoneId, 
commitPartitionId.commitPartitionId);
+
+        return 
finalTransactionStateResolver.resolveFinalTxState(transactionId, commitGroupId);
+    }
+
+    private ReplicationGroupId targetGroupId(int tableOrZoneId, int 
partitionIndex) {
+        return nodeProperties.colocationEnabled()
+                ? new ZonePartitionId(tableOrZoneId, partitionIndex)
+                : new TablePartitionId(tableOrZoneId, partitionIndex);
     }
 
-    private BuildIndexReplicaRequest 
createBuildIndexReplicaRequest(List<RowId> rowIds, HybridTimestamp 
initialOperationTimestamp) {
+    private BuildIndexReplicaRequest 
createBuildIndexReplicaRequest(BatchToIndex batch, HybridTimestamp 
initialOperationTimestamp) {
+        List<RowId> rowIds = batch.rowIds;
         boolean finish = rowIds.size() < batchSize;
 
         ReplicationGroupIdMessage groupIdMessage = 
nodeProperties.colocationEnabled()
@@ -269,6 +331,7 @@ class IndexBuildTask {
                 .indexId(taskId.getIndexId())
                 .rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
                 .finish(finish)
+                .abortedTransactionIds(batch.abortedTransactionIds)
                 .enlistmentConsistencyToken(enlistmentConsistencyToken)
                 .timestamp(initialOperationTimestamp)
                 .build();
@@ -298,4 +361,24 @@ class IndexBuildTask {
             }
         }
     }
+
+    private static class BatchToIndex {
+        private final List<RowId> rowIds;
+        private final Set<UUID> abortedTransactionIds;
+
+        private BatchToIndex(List<RowId> rowIds, Set<UUID> 
abortedTransactionIds) {
+            this.rowIds = rowIds;
+            this.abortedTransactionIds = abortedTransactionIds;
+        }
+    }
+
+    private static class CommitPartitionId {
+        private final @Nullable Integer commitTableOrZoneId;
+        private final int commitPartitionId;
+
+        private CommitPartitionId(@Nullable Integer commitTableOrZoneId, int 
commitPartitionId) {
+            this.commitTableOrZoneId = commitTableOrZoneId;
+            this.commitPartitionId = commitPartitionId;
+        }
+    }
 }
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index 5241ea5dd18..23fe48ce1cb 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -40,6 +40,9 @@ import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import org.apache.ignite.internal.table.distributed.index.MetaIndexStatus;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 
 /**
@@ -71,6 +74,10 @@ class IndexBuilder implements ManuallyCloseable {
 
     private final NodeProperties nodeProperties;
 
+    private final FinalTransactionStateResolver finalTransactionStateResolver;
+
+    private final IndexMetaStorage indexMetaStorage;
+
     private final Map<IndexBuildTaskId, IndexBuildTask> indexBuildTaskById = 
new ConcurrentHashMap<>();
 
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -80,11 +87,20 @@ class IndexBuilder implements ManuallyCloseable {
     private final List<IndexBuildCompletionListener> listeners = new 
CopyOnWriteArrayList<>();
 
     /** Constructor. */
-    IndexBuilder(Executor executor, ReplicaService replicaService, 
FailureProcessor failureProcessor, NodeProperties nodeProperties) {
+    IndexBuilder(
+            Executor executor,
+            ReplicaService replicaService,
+            FailureProcessor failureProcessor,
+            NodeProperties nodeProperties,
+            FinalTransactionStateResolver finalTransactionStateResolver,
+            IndexMetaStorage indexMetaStorage
+    ) {
         this.executor = executor;
         this.replicaService = replicaService;
         this.failureProcessor = failureProcessor;
         this.nodeProperties = nodeProperties;
+        this.finalTransactionStateResolver = finalTransactionStateResolver;
+        this.indexMetaStorage = indexMetaStorage;
     }
 
     /**
@@ -133,11 +149,13 @@ class IndexBuilder implements ManuallyCloseable {
 
             IndexBuildTask newTask = new IndexBuildTask(
                     taskId,
+                    indexCreationActivationTs(indexId),
                     indexStorage,
                     partitionStorage,
                     replicaService,
                     failureProcessor,
                     nodeProperties,
+                    finalTransactionStateResolver,
                     executor,
                     busyLock,
                     BATCH_SIZE,
@@ -196,11 +214,13 @@ class IndexBuilder implements ManuallyCloseable {
 
             IndexBuildTask newTask = new IndexBuildTask(
                     taskId,
+                    indexCreationActivationTs(indexId),
                     indexStorage,
                     partitionStorage,
                     replicaService,
                     failureProcessor,
                     nodeProperties,
+                    finalTransactionStateResolver,
                     executor,
                     busyLock,
                     BATCH_SIZE,
@@ -215,6 +235,14 @@ class IndexBuilder implements ManuallyCloseable {
         });
     }
 
+    private HybridTimestamp indexCreationActivationTs(int indexId) {
+        IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
+        assert indexMeta != null : "Index meta must be present for indexId=" + 
indexId;
+
+        long tsLong = 
indexMeta.statusChange(MetaIndexStatus.REGISTERED).activationTimestamp();
+        return HybridTimestamp.hybridTimestamp(tsLong);
+    }
+
     /**
      * Stops building all indexes (for a table partition) if they are in 
progress.
      *
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
index a474d5b9bfd..8aef37cf8af 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
@@ -46,9 +46,13 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import 
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TxMessageSender;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 
 /**
@@ -90,7 +94,8 @@ public class IndexBuildingManager implements IgniteComponent {
             ClockService clockService,
             FailureProcessor failureProcessor,
             NodeProperties nodeProperties,
-            LowWatermark lowWatermark
+            LowWatermark lowWatermark,
+            TxManager txManager
     ) {
         this.metaStorageManager = metaStorageManager;
 
@@ -107,7 +112,23 @@ public class IndexBuildingManager implements 
IgniteComponent {
 
         executor.allowCoreThreadTimeOut(true);
 
-        indexBuilder = new IndexBuilder(executor, replicaService, 
failureProcessor, nodeProperties);
+        TransactionStateResolver transactionStateResolver = new 
TransactionStateResolver(
+                txManager,
+                clockService,
+                clusterService.topologyService(),
+                clusterService.messagingService(),
+                new ExecutorInclinedPlacementDriver(placementDriver, executor),
+                new TxMessageSender(clusterService.messagingService(), 
replicaService, clockService)
+        );
+
+        indexBuilder = new IndexBuilder(
+                executor,
+                replicaService,
+                failureProcessor,
+                nodeProperties,
+                new 
RetryingFinalTransactionStateResolver(transactionStateResolver, executor),
+                indexMetaStorage
+        );
 
         indexAvailabilityController = new 
IndexAvailabilityController(catalogManager, metaStorageManager, 
failureProcessor, indexBuilder);
 
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/RetryingFinalTransactionStateResolver.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/RetryingFinalTransactionStateResolver.java
new file mode 100644
index 00000000000..61ff3483327
--- /dev/null
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/RetryingFinalTransactionStateResolver.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.function.Function.identity;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.CompletableFutures;
+
+/**
+ * FinalTransactionStateResolver implementation using retries. Makes sense 
when the coordinator of the transaction is known
+ * to have left the topology.
+ */
+public class RetryingFinalTransactionStateResolver implements 
FinalTransactionStateResolver {
+    private final TransactionStateResolver stateResolver;
+
+    private final Executor delayedExecutor;
+
+    /** Constructor. */
+    public RetryingFinalTransactionStateResolver(TransactionStateResolver 
stateResolver, Executor executor) {
+        this.stateResolver = stateResolver;
+        delayedExecutor = CompletableFuture.delayedExecutor(1000, 
TimeUnit.MILLISECONDS, executor);
+    }
+
+    @Override
+    public CompletableFuture<TxState> resolveFinalTxState(UUID transactionId, 
ReplicationGroupId commitGroupId) {
+        return stateResolver.resolveTxState(transactionId, commitGroupId, 
null, Long.MAX_VALUE, SECONDS)
+                .thenCompose(txMeta -> {
+                    if (txMeta != null && 
TxState.isFinalState(txMeta.txState())) {
+                        return completedFuture(txMeta.txState());
+                    }
+
+                    return retryResolve(transactionId, commitGroupId);
+                })
+                .handle((res, ex) -> {
+                    if (ex instanceof PrimaryReplicaAwaitTimeoutException) {
+                        return retryResolve(transactionId, commitGroupId);
+                    }
+
+                    return CompletableFutures.completedOrFailedFuture(res, ex);
+                })
+                .thenCompose(identity());
+    }
+
+    private CompletableFuture<TxState> retryResolve(UUID transactionId, 
ReplicationGroupId commitGroupId) {
+        return supplyAsync(() -> resolveFinalTxState(transactionId, 
commitGroupId), delayedExecutor)
+                .thenCompose(identity());
+    }
+}
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/CommittedFinalTransactionStateResolver.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/CommittedFinalTransactionStateResolver.java
new file mode 100644
index 00000000000..be884cacb2a
--- /dev/null
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/CommittedFinalTransactionStateResolver.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tx.TxState;
+
+/**
+ * FinalTransactionStateResolver implementation that always returns COMMITTED 
state immediately.
+ */
+public class CommittedFinalTransactionStateResolver implements 
FinalTransactionStateResolver {
+    @Override
+    public CompletableFuture<TxState> resolveFinalTxState(UUID transactionId, 
ReplicationGroupId commitGroupId) {
+        return CompletableFuture.completedFuture(TxState.COMMITTED);
+    }
+}
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index dcac1821417..4e62755f83a 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -86,11 +87,15 @@ public class IndexAvailabilityControllerTest extends 
BaseIgniteAbstractTest {
 
     private final ExecutorService executorService = newSingleThreadExecutor();
 
+    private final IndexMetaStorage indexMetaStorage = 
mock(IndexMetaStorage.class);
+
     private final IndexBuilder indexBuilder = new IndexBuilder(
             executorService,
             mock(ReplicaService.class, invocation -> nullCompletedFuture()),
             new NoOpFailureManager(),
-            new SystemPropertiesNodeProperties()
+            new SystemPropertiesNodeProperties(),
+            new CommittedFinalTransactionStateResolver(),
+            indexMetaStorage
     );
 
     private final IndexAvailabilityController indexAvailabilityController = 
new IndexAvailabilityController(
@@ -100,6 +105,11 @@ public class IndexAvailabilityControllerTest extends 
BaseIgniteAbstractTest {
             indexBuilder
     );
 
+    @BeforeEach
+    void configureMocks() {
+        IndexMetaStorageMocks.configureMocksForBuildingPhase(indexMetaStorage);
+    }
+
     @BeforeEach
     void setUp() {
         ComponentContext context = new ComponentContext();
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
index 4f517daaeba..9fb01cf1e65 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
@@ -53,8 +53,10 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /** For {@link IndexBuilder} testing. */
@@ -73,13 +75,22 @@ public class IndexBuilderTest extends 
BaseIgniteAbstractTest {
 
     private final ExecutorService executorService = newSingleThreadExecutor();
 
+    private final IndexMetaStorage indexMetaStorage = 
mock(IndexMetaStorage.class);
+
     private final IndexBuilder indexBuilder = new IndexBuilder(
             executorService,
             replicaService,
             new NoOpFailureManager(),
-            new SystemPropertiesNodeProperties()
+            new SystemPropertiesNodeProperties(),
+            new CommittedFinalTransactionStateResolver(),
+            indexMetaStorage
     );
 
+    @BeforeEach
+    void configureMocks() {
+        IndexMetaStorageMocks.configureMocksForBuildingPhase(indexMetaStorage);
+    }
+
     @AfterEach
     void tearDown() throws Exception {
         closeAll(
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexMetaStorageMocks.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexMetaStorageMocks.java
new file mode 100644
index 00000000000..1dd8f16cf46
--- /dev/null
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexMetaStorageMocks.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
+
+class IndexMetaStorageMocks {
+    static void configureMocksForBuildingPhase(IndexMetaStorage 
indexMetaStorage) {
+        doAnswer(invocation -> {
+            int indexId = invocation.getArgument(0);
+
+            IndexMeta indexMeta = mock(IndexMeta.class);
+            when(indexMeta.indexId()).thenReturn(indexId);
+            when(indexMeta.statusChange(any())).thenReturn(new 
MetaIndexStatusChange(1, HybridTimestamp.MIN_VALUE.longValue()));
+
+            return indexMeta;
+        }).when(indexMetaStorage).indexMeta(anyInt());
+    }
+}
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 5233c985ce6..e8c616a44dd 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -858,7 +858,8 @@ public class Node {
                 clockService,
                 failureManager,
                 nodeProperties,
-                lowWatermark
+                lowWatermark,
+                txManager
         );
 
         systemViewManager = new SystemViewManagerImpl(name, catalogManager, 
failureManager);
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 09c2f55e802..7564fb07d5e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.partition.replicator.network.PartitionR
 import org.apache.ignite.internal.network.annotations.MessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV2;
+import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV3;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandV1;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandV2;
 import 
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
@@ -276,6 +277,9 @@ public interface PartitionReplicationMessageGroup {
 
         /** Message type for {@link FinishTxCommandV2}. */
         short FINISH_TX_V2 = 50;
+
+        /** Message type for {@link BuildIndexCommandV3}. */
+        short BUILD_INDEX_V3 = 51;
     }
 
     /**
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/BuildIndexCommandV3.java
similarity index 54%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
copy to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/BuildIndexCommandV3.java
index 71dee513a9e..7bc88921f5a 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/BuildIndexCommandV3.java
@@ -15,29 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.partition.replicator.network.replication;
+package org.apache.ignite.internal.partition.replicator.network.command;
 
-import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
-import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
-import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
-import org.apache.ignite.internal.replicator.message.TableAware;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands;
 
-/**
- * Replica request to build a table index.
- *
- * <p>It is possible to receive a {@link PrimaryReplicaMissException} in 
response to message processing if the leaseholder changes.</p>
- */
-@Transferable(PartitionReplicationMessageGroup.BUILD_INDEX_REPLICA_REQUEST)
-public interface BuildIndexReplicaRequest extends PrimaryReplicaRequest, 
TableAware {
-    /** Returns index ID. */
-    int indexId();
-
-    /** Returns row IDs for which to build indexes. */
-    List<UUID> rowIds();
-
-    /** Returns {@code true} if this batch is the last one. */
-    boolean finish();
+/** Extension of {@link BuildIndexCommandV2} with new fields to support 
backward compatibility. */
+@Transferable(Commands.BUILD_INDEX_V3)
+public interface BuildIndexCommandV3 extends BuildIndexCommandV2 {
+    /** IDs of transactions (to which write intents in this batch belong) that 
are known to have been aborted. */
+    Set<UUID> abortedTransactionIds();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
index 71dee513a9e..acd88b7c086 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/BuildIndexReplicaRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.partition.replicator.network.replication;
 
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
@@ -40,4 +41,7 @@ public interface BuildIndexReplicaRequest extends 
PrimaryReplicaRequest, TableAw
 
     /** Returns {@code true} if this batch is the last one. */
     boolean finish();
+
+    /** IDs of transactions (to which write intents in this batch belong) that 
are known to have been aborted. */
+    Set<UUID> abortedTransactionIds();
 }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/network/command/PartitionCommandsCompatibilityTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/network/command/PartitionCommandsCompatibilityTest.java
index b34b2609704..909e5adac99 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/network/command/PartitionCommandsCompatibilityTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/network/command/PartitionCommandsCompatibilityTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.partition.replicator.network.command;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -66,6 +68,7 @@ public class PartitionCommandsCompatibilityTest extends 
BaseCommandsCompatibilit
         return List.of(
                 createBuildIndexCommand(),
                 createBuildIndexCommandV2(),
+                createBuildIndexCommandV3(),
                 createFinishTxCommandV1(),
                 createFinishTxCommandV2(),
                 createUpdateAllCommand(),
@@ -99,6 +102,18 @@ public class PartitionCommandsCompatibilityTest extends 
BaseCommandsCompatibilit
         assertEquals(7, command.tableId());
     }
 
+    @Test
+    @TestForCommand(BuildIndexCommandV3.class)
+    void testBuildIndexCommandV3() {
+        BuildIndexCommandV3 command = 
decodeCommand("CjQDAP/////////W/////////7sAAAAAAAAAACoAAAAAAAAARQFGAgAAAAAAAAAAKgAAAAAAAABFCA==");
+
+        assertEquals(69, command.indexId());
+        assertEquals(List.of(uuid()), command.rowIds());
+        assertTrue(command.finish());
+        assertEquals(7, command.tableId());
+        assertThat(command.abortedTransactionIds(), containsInAnyOrder(uuid(), 
anotherUuid()));
+    }
+
     @Test
     @TestForCommand(FinishTxCommandV1.class)
     void testFinishTxCommandV1() {
@@ -390,6 +405,16 @@ public class PartitionCommandsCompatibilityTest extends 
BaseCommandsCompatibilit
                 .build();
     }
 
+    private BuildIndexCommandV2 createBuildIndexCommandV3() {
+        return commandFactory.buildIndexCommandV3()
+                .indexId(69)
+                .rowIds(List.of(uuid()))
+                .finish(true)
+                .tableId(7)
+                .abortedTransactionIds(Set.of(uuid(), anotherUuid()))
+                .build();
+    }
+
     private BuildIndexCommand createBuildIndexCommand() {
         return commandFactory.buildIndexCommand()
                 .indexId(69)
diff --git 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/BaseCommandsCompatibilityTest.java
 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/BaseCommandsCompatibilityTest.java
index 4075ffefa68..fe61f6d617b 100644
--- 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/BaseCommandsCompatibilityTest.java
+++ 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/BaseCommandsCompatibilityTest.java
@@ -86,6 +86,10 @@ public abstract class BaseCommandsCompatibilityTest extends 
BaseIgniteAbstractTe
         return new UUID(42, 69);
     }
 
+    protected static UUID anotherUuid() {
+        return new UUID(-42, -69);
+    }
+
     protected static HybridTimestamp initiatorTime() {
         return HybridTimestamp.hybridTimestamp(70);
     }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index b4086f773a5..f318532845d 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1198,7 +1198,8 @@ public class IgniteImpl implements Ignite {
                 clockService,
                 failureManager,
                 nodeProperties,
-                lowWatermark
+                lowWatermark,
+                txManager
         );
 
         qryEngine = new SqlQueryProcessor(
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 58aa1241949..7342f178cec 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -269,6 +269,14 @@ public interface MvPartitionStorage extends 
ManuallyCloseable {
      */
     @Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
 
+    /**
+     * Returns a row, existing in the storage, that's greater or equal than 
the lower bound. {@code null} if not found.
+     *
+     * @param lowerBound Lower bound.
+     * @throws StorageException If failed to read data from the storage.
+     */
+    @Nullable RowMeta closestRow(RowId lowerBound) throws StorageException;
+
     /**
      * Returns the head of GC queue.
      *
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
new file mode 100644
index 00000000000..51030c8c93f
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/RowMeta.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents metadata of a row stored in the storage.
+ */
+public class RowMeta {
+    private final RowId rowId;
+    private final @Nullable UUID transactionId;
+
+    private final @Nullable Integer commitTableOrZoneId;
+    private final int commitPartitionId;
+
+    /** Creates a RowMeta instance for a row without a write intent. */
+    public static RowMeta withoutWriteIntent(RowId rowId) {
+        return new RowMeta(rowId, null, null, 
ReadResult.UNDEFINED_COMMIT_PARTITION_ID);
+    }
+
+    /** Constructor. */
+    public RowMeta(RowId rowId, @Nullable UUID transactionId, @Nullable 
Integer commitTableOrZoneId, int commitPartitionId) {
+        this.rowId = rowId;
+        this.transactionId = transactionId;
+        this.commitTableOrZoneId = commitTableOrZoneId;
+        this.commitPartitionId = commitPartitionId;
+    }
+
+    /** Returns the row ID. */
+    public RowId rowId() {
+        return rowId;
+    }
+
+    /** Returns the transaction ID if this row is a write intent, or {@code 
null} otherwise. */
+    public @Nullable UUID transactionId() {
+        return transactionId;
+    }
+
+    /** Returns {@code true} if this row is a write intent. */
+    public boolean isWriteIntent() {
+        return transactionId != null;
+    }
+
+    /** Returns the commit table or zone ID if this row has a write intent, or 
{@code null} otherwise. */
+    public @Nullable Integer commitTableOrZoneId() {
+        return commitTableOrZoneId;
+    }
+
+    /** Returns the commit partition ID. If row has no write intent, it's 
{@link ReadResult#UNDEFINED_COMMIT_PARTITION_ID}. */
+    public int commitPartitionId() {
+        return commitPartitionId;
+    }
+}
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index 0e558f2d973..d9d0746e628 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -150,6 +150,13 @@ public class ThreadAssertingMvPartitionStorage implements 
MvPartitionStorage, Wr
         return partitionStorage.closestRowId(lowerBound);
     }
 
+    @Override
+    public @Nullable RowMeta closestRow(RowId lowerBound) throws 
StorageException {
+        assertThreadAllowsToRead();
+
+        return partitionStorage.closestRow(lowerBound);
+    }
+
     @Override
     public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
         assertThreadAllowsToRead();
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index e9f7b361ef9..6a1d2a49abf 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -154,6 +154,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             abortWrite(ROW_ID, txId);
 
             assertNull(storage.closestRowId(ROW_ID));
+            assertNull(storage.closestRow(ROW_ID));
         }
     }
 
@@ -196,6 +197,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             );
 
             assertNull(storage.closestRowId(ROW_ID));
+            assertNull(storage.closestRow(ROW_ID));
         }
     }
 
@@ -222,6 +224,7 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
             assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
 
             assertNull(storage.closestRowId(ROW_ID));
+            assertNull(storage.closestRow(ROW_ID));
 
             assertThat(rows, empty());
         }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 826f9f7af57..90d0d358db9 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1519,6 +1519,48 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
         assertNull(storage.closestRowId(rowId2.increment()));
     }
 
+    @Test
+    void testClosestRow() {
+        RowId rowId0 = new RowId(PARTITION_ID, 1, -1);
+        RowId rowId1 = new RowId(PARTITION_ID, 1, 0);
+        RowId rowId2 = new RowId(PARTITION_ID, 1, 1);
+
+        RowMeta expectedRowMeta1 = new RowMeta(rowId1, txId, COMMIT_TABLE_ID, 
PARTITION_ID);
+        RowMeta expectedRowMeta2 = new RowMeta(rowId2, txId, COMMIT_TABLE_ID, 
PARTITION_ID);
+
+        addWrite(rowId1, binaryRow, txId);
+        addWrite(rowId2, binaryRow2, txId);
+
+        assertRowMetaEquals(expectedRowMeta1, storage.closestRow(rowId0));
+        assertRowMetaEquals(expectedRowMeta1, 
storage.closestRow(rowId0.increment()));
+
+        assertRowMetaEquals(expectedRowMeta1, storage.closestRow(rowId1));
+
+        assertRowMetaEquals(expectedRowMeta2, storage.closestRow(rowId2));
+
+        assertNull(storage.closestRow(rowId2.increment()));
+    }
+
+    private static void assertRowMetaEquals(RowMeta expected, RowMeta actual) {
+        assertNotNull(actual);
+
+        assertEquals(expected.rowId(), actual.rowId());
+        assertEquals(expected.transactionId(), actual.transactionId());
+        assertEquals(expected.commitTableOrZoneId(), 
actual.commitTableOrZoneId());
+        assertEquals(expected.commitPartitionId(), actual.commitPartitionId());
+    }
+
+    @Test
+    void testClosestRowReconstruction() {
+        RowId rowId = new RowId(PARTITION_ID, 0x1234567890ABCDEFL, 
0xFEDCBA0987654321L);
+
+        RowMeta expectedRowMeta = new RowMeta(rowId, txId, COMMIT_TABLE_ID, 
PARTITION_ID);
+
+        addWrite(rowId, binaryRow, txId);
+
+        assertRowMetaEquals(expectedRowMeta, 
storage.closestRow(RowId.lowestRowId(PARTITION_ID)));
+    }
+
     @Test
     public void addWriteCommittedAddsCommittedVersion() {
         RowId rowId = new RowId(PARTITION_ID);
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index f56f92ae8e8..5708e7f2b7f 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -607,6 +607,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvTableStorageTest
         assertThrows(StorageDestroyedException.class, () -> 
storage.scanVersions(rowId));
 
         assertThrows(StorageDestroyedException.class, () -> 
storage.closestRowId(rowId));
+        assertThrows(StorageDestroyedException.class, () -> 
storage.closestRow(rowId));
     }
 
     @Test
@@ -1558,6 +1559,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvTableStorageTest
             assertThrows(StorageRebalanceException.class, () -> 
storage.scanVersions(rowId));
             assertThrows(StorageRebalanceException.class, () -> 
storage.scan(clock.now()));
             assertThrows(StorageRebalanceException.class, () -> 
storage.closestRowId(rowId));
+            assertThrows(StorageRebalanceException.class, () -> 
storage.closestRow(rowId));
 
             return null;
         });
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index e27244157c6..622ed7fe43c 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
 import java.util.UUID;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.RowMeta;
 import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageDestroyedException;
 import org.apache.ignite.internal.storage.StorageException;
@@ -138,7 +140,7 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
             return new VersionChain(rowId, row, null, txId, commitTableId, 
commitPartitionId, next);
         }
 
-        static VersionChain forCommitted(RowId rowId, @Nullable 
HybridTimestamp timestamp, VersionChain uncommittedVersionChain) {
+        static VersionChain forCommitted(RowId rowId, HybridTimestamp 
timestamp, VersionChain uncommittedVersionChain) {
             return new VersionChain(rowId, uncommittedVersionChain.row, 
timestamp, null, null,
                     ReadResult.UNDEFINED_COMMIT_PARTITION_ID, 
uncommittedVersionChain.next);
         }
@@ -628,6 +630,20 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
         return map.ceilingKey(lowerBound);
     }
 
+    @Override
+    public @Nullable RowMeta closestRow(RowId lowerBound) throws 
StorageException {
+        checkStorageClosedOrInProcessOfRebalance();
+
+        Entry<RowId, VersionChain> entry = map.ceilingEntry(lowerBound);
+        if (entry == null) {
+            return null;
+        }
+
+        VersionChain versionChain = entry.getValue();
+
+        return new RowMeta(versionChain.rowId, versionChain.txId, 
versionChain.commitTableId, versionChain.commitPartitionId);
+    }
+
     @Override
     public synchronized @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
         assert THREAD_LOCAL_LOCKER.get() != null;
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index f7acc6dd576..fe9582d44d6 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.RowMeta;
 import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
@@ -258,7 +259,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     }
 
     ReadResult findLatestRowVersion(VersionChain versionChain) {
-        RowVersion rowVersion = readRowVersion(versionChain.headLink(), 
ALWAYS_LOAD_VALUE);
+        return findLatestRowVersion(versionChain, ALWAYS_LOAD_VALUE);
+    }
+
+    private ReadResult findLatestRowVersion(VersionChain versionChain, 
Predicate<HybridTimestamp> loadValue) {
+        RowVersion rowVersion = readRowVersion(versionChain.headLink(), 
loadValue);
 
         if (versionChain.isUncommitted()) {
             assert versionChain.transactionId() != null;
@@ -267,7 +272,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
 
             if (versionChain.hasCommittedVersions()) {
                 long newestCommitLink = versionChain.newestCommittedLink();
-                newestCommitTs = readRowVersion(newestCommitLink, 
ALWAYS_LOAD_VALUE).timestamp();
+                newestCommitTs = readRowVersion(newestCommitLink, 
loadValue).timestamp();
             }
 
             return writeIntentToResult(versionChain, rowVersion, 
newestCommitTs);
@@ -613,6 +618,29 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
         });
     }
 
+    @Override
+    public @Nullable RowMeta closestRow(RowId lowerBound) throws 
StorageException {
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableState();
+
+            try (Cursor<VersionChain> cursor = 
renewableState.versionChainTree().find(new VersionChainKey(lowerBound), null)) {
+                if (cursor.hasNext()) {
+                    VersionChain versionChain = cursor.next();
+                    return new RowMeta(
+                            versionChain.rowId(),
+                            versionChain.transactionId(),
+                            versionChain.commitTableId(),
+                            versionChain.commitPartitionId()
+                    );
+                }
+
+                return null;
+            } catch (Exception e) {
+                throw new StorageException("Error occurred while trying to 
read a row id", e);
+            }
+        });
+    }
+
     @Override
     public void close() {
         if (!transitionToClosedState(state, this::createStorageInfo)) {
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 8d6134813b5..3abd1e1953d 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.RowMeta;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
@@ -1113,6 +1114,48 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
         });
     }
 
+    @Override
+    public @Nullable RowMeta closestRow(RowId lowerBound) throws 
StorageException {
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+
+            ByteBuffer keyBuf = prepareDirectDataIdKeyBuf(lowerBound)
+                    .position(0)
+                    .limit(ROW_PREFIX_SIZE);
+
+            try (RocksIterator it = db.newIterator(helper.partCf, 
helper.scanReadOpts)) {
+                it.seek(keyBuf);
+
+                if (!it.isValid()) {
+                    it.status();
+
+                    return null;
+                }
+
+                keyBuf.rewind();
+                int keyLength = it.key(keyBuf);
+                boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE;
+
+                RowId rowId = getRowId(keyBuf);
+
+                if (isWriteIntent) {
+                    ByteBuffer transactionState = ByteBuffer.wrap(it.value());
+
+                    readDataIdFromTxState(transactionState);
+                    UUID txId = new UUID(transactionState.getLong(), 
transactionState.getLong());
+                    int commitTableId = transactionState.getInt();
+                    int commitPartitionId = 
Short.toUnsignedInt(transactionState.getShort());
+
+                    return new RowMeta(rowId, txId, commitTableId, 
commitPartitionId);
+                } else {
+                    return RowMeta.withoutWriteIntent(rowId);
+                }
+            } catch (RocksDBException e) {
+                throw new IgniteRocksDbException("Error finding closest Row 
ID", e);
+            }
+        });
+    }
+
     private static void incrementRowId(ByteBuffer buf) {
         long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES);
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/MetaIndexStatusChange.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/MetaIndexStatusChange.java
index 72040807de1..448627869b9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/MetaIndexStatusChange.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/MetaIndexStatusChange.java
@@ -27,7 +27,7 @@ public class MetaIndexStatusChange {
     private final long activationTs;
 
     /** Constructor. */
-    MetaIndexStatusChange(int catalogVersion, long activationTs) {
+    public MetaIndexStatusChange(int catalogVersion, long activationTs) {
         this.catalogVersion = catalogVersion;
         this.activationTs = activationTs;
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index da83a602324..3491323c4dc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -21,6 +21,7 @@ import static java.lang.Math.max;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
 import static 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.BUILD_INDEX_V1;
 import static 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.BUILD_INDEX_V2;
+import static 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.BUILD_INDEX_V3;
 import static 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.FINISH_TX_V1;
 import static 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.FINISH_TX_V2;
 import static 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND;
@@ -193,18 +194,16 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
                 tablePartitionId,
                 minTimeCollectorService
         ));
-        commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V1, new 
BuildIndexCommandHandler(
-                storage,
-                indexMetaStorage,
-                storageUpdateHandler,
-                schemaRegistry
-        ));
-        commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V2, new 
BuildIndexCommandHandler(
+
+        BuildIndexCommandHandler buildIndexCommandHandler = new 
BuildIndexCommandHandler(
                 storage,
                 indexMetaStorage,
                 storageUpdateHandler,
                 schemaRegistry
-        ));
+        );
+        commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V1, 
buildIndexCommandHandler);
+        commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V2, 
buildIndexCommandHandler);
+        commandHandlersBuilder.addHandler(GROUP_TYPE, BUILD_INDEX_V3, 
buildIndexCommandHandler);
 
         if (!nodeProperties.colocationEnabled()) {
             commandHandlersBuilder.addHandler(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
index 96b442b5432..5598113e6db 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexCommandHandler.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV2;
+import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommandV3;
 import org.apache.ignite.internal.partition.replicator.raft.CommandResult;
 import 
org.apache.ignite.internal.partition.replicator.raft.handlers.AbstractCommandHandler;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
@@ -110,7 +112,10 @@ public class BuildIndexCommandHandler extends 
AbstractCommandHandler<BuildIndexC
             return EMPTY_APPLIED_RESULT;
         }
 
-        BuildIndexRowVersionChooser rowVersionChooser = 
createBuildIndexRowVersionChooser(indexMeta);
+        Set<UUID> abortedTransactionIds = command instanceof 
BuildIndexCommandV3
+                ? ((BuildIndexCommandV3) command).abortedTransactionIds()
+                : Set.of();
+        BuildIndexRowVersionChooser rowVersionChooser = 
createBuildIndexRowVersionChooser(indexMeta, abortedTransactionIds);
 
         BinaryRowUpgrader binaryRowUpgrader = 
createBinaryRowUpgrader(indexMeta);
 
@@ -146,14 +151,15 @@ public class BuildIndexCommandHandler extends 
AbstractCommandHandler<BuildIndexC
         return EMPTY_APPLIED_RESULT;
     }
 
-    private BuildIndexRowVersionChooser 
createBuildIndexRowVersionChooser(IndexMeta indexMeta) {
+    private BuildIndexRowVersionChooser 
createBuildIndexRowVersionChooser(IndexMeta indexMeta, Set<UUID> 
abortedTransactionIds) {
         MetaIndexStatusChange registeredChangeInfo = 
indexMeta.statusChange(REGISTERED);
         MetaIndexStatusChange buildingChangeInfo = 
indexMeta.statusChange(BUILDING);
 
         return new BuildIndexRowVersionChooser(
                 storage,
                 registeredChangeInfo.activationTimestamp(),
-                buildingChangeInfo.activationTimestamp()
+                buildingChangeInfo.activationTimestamp(),
+                abortedTransactionIds
         );
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooser.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooser.java
index 33b51a51827..a2514d5e5a0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooser.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooser.java
@@ -21,6 +21,8 @@ import static 
org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
 import org.apache.ignite.internal.storage.BinaryRowAndRowId;
@@ -38,6 +40,8 @@ class BuildIndexRowVersionChooser {
     /** Timestamp of activation of the catalog version in which the index 
start building (get {@link CatalogIndexStatus#BUILDING}). */
     private final long startBuildingIndexActivationTs;
 
+    private final Set<UUID> abortedTransactionIds;
+
     /**
      * Constructor.
      *
@@ -45,11 +49,18 @@ class BuildIndexRowVersionChooser {
      * @param createIndexActivationTs Timestamp of activation of the catalog 
version in which the index created.
      * @param startBuildingIndexActivationTs Timestamp of activation of the 
catalog version in which the index start building
      *      (get {@link CatalogIndexStatus#BUILDING}).
+     * @param abortedTransactionIds IDs of transactions that are known to have 
been aborted.
      */
-    BuildIndexRowVersionChooser(PartitionDataStorage storage, long 
createIndexActivationTs, long startBuildingIndexActivationTs) {
+    BuildIndexRowVersionChooser(
+            PartitionDataStorage storage,
+            long createIndexActivationTs,
+            long startBuildingIndexActivationTs,
+            Set<UUID> abortedTransactionIds
+    ) {
         this.storage = storage;
         this.createIndexActivationTs = createIndexActivationTs;
         this.startBuildingIndexActivationTs = startBuildingIndexActivationTs;
+        this.abortedTransactionIds = Set.copyOf(abortedTransactionIds);
     }
 
     /**
@@ -78,7 +89,8 @@ class BuildIndexRowVersionChooser {
                 }
 
                 if (readResult.isWriteIntent()) {
-                    if (beginTs(readResult) >= createIndexActivationTs) {
+                    if (beginTs(readResult) >= createIndexActivationTs
+                            || 
abortedTransactionIds.contains(readResult.transactionId())) {
                         continue;
                     }
                 } else {
@@ -89,7 +101,6 @@ class BuildIndexRowVersionChooser {
                     }
                 }
 
-                // TODO: IGNITE-21546 It may be necessary to return ReadResult 
for write intent resolution
                 result.add(new BinaryRowAndRowId(readResult.binaryRow(), 
rowId));
 
                 if (takenLatestVersionOfWriteCommitted) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index da3cc3dab03..00189c63500 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -18,16 +18,19 @@
 package org.apache.ignite.internal.table.distributed.replicator;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static 
org.apache.ignite.internal.tx.impl.PlacementDriverHelper.AWAIT_PRIMARY_REPLICA_TIMEOUT;
 
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
@@ -140,6 +143,26 @@ public class TransactionStateResolver {
             UUID txId,
             ReplicationGroupId commitGrpId,
             @Nullable HybridTimestamp timestamp
+    ) {
+        return resolveTxState(txId, commitGrpId, timestamp, 
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+    }
+
+    /**
+     * Resolves transaction state locally, if possible, or distributively, if 
needed.
+     *
+     * @param txId Transaction id.
+     * @param commitGrpId Commit partition group id.
+     * @param timestamp Timestamp.
+     * @param awaitCommitPartitionAvailabilityTimeout Timeout for awaiting 
commit partition primary replica.
+     * @param awaitCommitPartitionAvailabilityTimeUnit Time unit for awaiting 
commit partition primary replica timeout.
+     * @return Future with the transaction state meta as a result.
+     */
+    public CompletableFuture<TransactionMeta> resolveTxState(
+            UUID txId,
+            ReplicationGroupId commitGrpId,
+            @Nullable HybridTimestamp timestamp,
+            long awaitCommitPartitionAvailabilityTimeout,
+            TimeUnit awaitCommitPartitionAvailabilityTimeUnit
     ) {
         TxStateMeta localMeta = txManager.stateMeta(txId);
 
@@ -151,7 +174,15 @@ public class TransactionStateResolver {
             if (v == null) {
                 v = new CompletableFuture<>();
 
-                resolveDistributiveTxState(txId, localMeta, commitGrpId, 
timestamp, v);
+                resolveDistributiveTxState(
+                        txId,
+                        localMeta,
+                        commitGrpId,
+                        timestamp,
+                        awaitCommitPartitionAvailabilityTimeout,
+                        awaitCommitPartitionAvailabilityTimeUnit,
+                        v
+                );
             }
 
             return v;
@@ -169,6 +200,8 @@ public class TransactionStateResolver {
      * @param localMeta Local tx meta.
      * @param commitGrpId Commit partition group id.
      * @param timestamp Timestamp to pass to target node.
+     * @param awaitCommitPartitionAvailabilityTimeout Timeout for awaiting 
commit partition primary replica.
+     * @param awaitCommitPartitionAvailabilityTimeUnit Time unit for awaiting 
commit partition primary replica timeout.
      * @param txMetaFuture Tx meta future to complete with the result.
      */
     private void resolveDistributiveTxState(
@@ -176,6 +209,8 @@ public class TransactionStateResolver {
             @Nullable TxStateMeta localMeta,
             ReplicationGroupId commitGrpId,
             @Nullable HybridTimestamp timestamp,
+            long awaitCommitPartitionAvailabilityTimeout,
+            TimeUnit awaitCommitPartitionAvailabilityTimeUnit,
             CompletableFuture<TransactionMeta> txMetaFuture
     ) {
         assert localMeta == null || !isFinalState(localMeta.txState()) : 
"Unexpected tx meta [txId" + txId + ", meta=" + localMeta + ']';
@@ -184,7 +219,13 @@ public class TransactionStateResolver {
 
         if (localMeta == null) {
             // Fallback to commit partition path, because we don't have 
coordinator id.
-            resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture);
+            resolveTxStateFromCommitPartition(
+                    txId,
+                    commitGrpId,
+                    awaitCommitPartitionAvailabilityTimeout,
+                    awaitCommitPartitionAvailabilityTimeUnit,
+                    txMetaFuture
+            );
         } else if (localMeta.txState() == PENDING) {
             resolveTxStateFromTxCoordinator(txId, localMeta.txCoordinatorId(), 
commitGrpId, timestamp0, txMetaFuture);
         } else if (localMeta.txState() == FINISHING) {
@@ -241,10 +282,20 @@ public class TransactionStateResolver {
             UUID txId,
             ReplicationGroupId commitGrpId,
             CompletableFuture<TransactionMeta> txMetaFuture
+    ) {
+        resolveTxStateFromCommitPartition(txId, commitGrpId, 
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS, txMetaFuture);
+    }
+
+    private void resolveTxStateFromCommitPartition(
+            UUID txId,
+            ReplicationGroupId commitGrpId,
+            long awaitPrimaryReplicaTimeout,
+            TimeUnit awaitPrimaryReplicaTimeUnit,
+            CompletableFuture<TransactionMeta> txMetaFuture
     ) {
         updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
 
-        sendAndRetry(txMetaFuture, commitGrpId, txId);
+        sendAndRetry(txMetaFuture, commitGrpId, txId, 
awaitPrimaryReplicaTimeout, awaitPrimaryReplicaTimeUnit);
     }
 
     /**
@@ -271,8 +322,14 @@ public class TransactionStateResolver {
      * @param replicaGrp Replication group id.
      * @param txId Transaction id.
      */
-    private void sendAndRetry(CompletableFuture<TransactionMeta> resFut, 
ReplicationGroupId replicaGrp, UUID txId) {
-        
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(replicaGrp)
+    private void sendAndRetry(
+            CompletableFuture<TransactionMeta> resFut,
+            ReplicationGroupId replicaGrp,
+            UUID txId,
+            long awaitPrimaryReplicaTimeout,
+            TimeUnit awaitPrimaryReplicaTimeUnit
+    ) {
+        
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(replicaGrp, 
awaitPrimaryReplicaTimeout, awaitPrimaryReplicaTimeUnit)
                 .thenCompose(replicaMeta ->
                         txMessageSender.resolveTxStateFromCommitPartition(
                                 replicaMeta.getLeaseholder(),
@@ -287,7 +344,7 @@ public class TransactionStateResolver {
                         resFut.complete(txMeta);
                     } else {
                         if (e instanceof PrimaryReplicaMissException) {
-                            sendAndRetry(resFut, replicaGrp, txId);
+                            sendAndRetry(resFut, replicaGrp, txId, 
awaitPrimaryReplicaTimeout, awaitPrimaryReplicaTimeUnit);
                         } else {
                             resFut.completeExceptionally(e);
                         }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
index 0511007b50f..0926695a6c2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/handlers/BuildIndexReplicaRequestHandler.java
@@ -95,12 +95,13 @@ public class BuildIndexReplicaRequestHandler {
     }
 
     private static BuildIndexCommand 
toBuildIndexCommand(BuildIndexReplicaRequest request, MetaIndexStatusChange 
buildingChangeInfo) {
-        return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV2()
+        return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV3()
                 .indexId(request.indexId())
                 .tableId(request.tableId())
                 .rowIds(request.rowIds())
                 .finish(request.finish())
-                // We are sure that there will be no error here since the 
primary replica is sent the request to itself.
+                .abortedTransactionIds(request.abortedTransactionIds())
+                // We are sure that there will be no error here since the 
primary replica is sending the request to itself.
                 .requiredCatalogVersion(buildingChangeInfo.catalogVersion())
                 .build();
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooserTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooserTest.java
index 48cf66c1a8b..6537e46632a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooserTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/handlers/BuildIndexRowVersionChooserTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.raft.handlers;
 
+import static java.util.Collections.emptySet;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.storage.BinaryRowAndRowIdMatcher.equalToBinaryRowAndRowId;
 import static org.apache.ignite.internal.storage.RowId.lowestRowId;
@@ -25,8 +26,10 @@ import static 
org.apache.ignite.internal.type.NativeTypes.INT32;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.spy;
 
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
@@ -63,11 +66,16 @@ public class BuildIndexRowVersionChooserTest extends 
IgniteAbstractTest {
 
     private final PartitionDataStorage partitionDataStorage = spy(new 
TestPartitionDataStorage(TABLE_ID, PARTITION_ID, mvPartitionStorage));
 
-    private final BuildIndexRowVersionChooser chooser = new 
BuildIndexRowVersionChooser(
-            partitionDataStorage,
-            CREATE_INDEX_ACTIVATION_TS_MILLS,
-            START_BUILDING_INDEX_ACTIVATION_TS_MILLS
-    );
+    private final BuildIndexRowVersionChooser chooser = 
createChooser(emptySet());
+
+    private BuildIndexRowVersionChooser createChooser(Set<UUID> 
abortedTransactionIds) {
+        return new BuildIndexRowVersionChooser(
+                partitionDataStorage,
+                CREATE_INDEX_ACTIVATION_TS_MILLS,
+                START_BUILDING_INDEX_ACTIVATION_TS_MILLS,
+                abortedTransactionIds
+        );
+    }
 
     @Test
     void testEmptyStorage() {
@@ -107,6 +115,20 @@ public class BuildIndexRowVersionChooserTest extends 
IgniteAbstractTest {
         assertThat(chooser.chooseForBuildIndex(rowId), 
contains(expBinaryRowAndRowId(rowId, row)));
     }
 
+    @Test
+    void testWriteIntentBelongingToAbortedTransaction() {
+        UUID txId = txId(CREATE_INDEX_ACTIVATION_TS_MILLS - 1);
+
+        var customizedChooser = createChooser(Set.of(txId));
+
+        RowId rowId = lowestRowId(PARTITION_ID);
+        BinaryRow row = binaryRow(1, 2);
+
+        addWrite(rowId, row, txId);
+
+        assertThat(customizedChooser.chooseForBuildIndex(rowId), is(empty()));
+    }
+
     @Test
     void testWriteCommittedAfterStartBuildingActivationTs() {
         RowId rowId = lowestRowId(PARTITION_ID);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index fe0968f689d..461039c473f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -3374,6 +3374,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .indexId(indexId)
                 .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .rowIds(List.of())
+                .abortedTransactionIds(Set.of())
                 .timestamp(clock.current())
                 .build();
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
index faa80e005f8..d4a83cff41e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -49,7 +50,8 @@ public class PlacementDriverHelper {
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(PlacementDriverHelper.class);
 
-    private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
+    /** Default timeout in seconds to await primary replica. */
+    public static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
 
     /** Placement driver. */
     private final PlacementDriver placementDriver;
@@ -75,16 +77,35 @@ public class PlacementDriverHelper {
      *         appeared during the await timeout.
      */
     public CompletableFuture<ReplicaMeta> 
awaitPrimaryReplicaWithExceptionHandling(ReplicationGroupId partitionId) {
+        return awaitPrimaryReplicaWithExceptionHandling(partitionId, 
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+    }
+
+    /**
+     * Wait for primary replica to appear for the provided partition.
+     *
+     * @param partitionId Partition id.
+     * @param timeout Timeout duration.
+     * @param timeUnit Timeout time unit.
+     * @return Future that completes with node id that is a primary for the 
provided partition, or completes with exception if no primary
+     *         appeared during the await timeout.
+     */
+    public CompletableFuture<ReplicaMeta> 
awaitPrimaryReplicaWithExceptionHandling(
+            ReplicationGroupId partitionId,
+            long timeout,
+            TimeUnit timeUnit
+    ) {
         HybridTimestamp timestamp = clockService.now();
 
-        return awaitPrimaryReplicaWithExceptionHandling(partitionId, 
timestamp);
+        return awaitPrimaryReplicaWithExceptionHandling(partitionId, 
timestamp, timeout, timeUnit);
     }
 
     private CompletableFuture<ReplicaMeta> 
awaitPrimaryReplicaWithExceptionHandling(
             ReplicationGroupId partitionId,
-            HybridTimestamp timestamp
+            HybridTimestamp timestamp,
+            long timeout,
+            TimeUnit timeUnit
     ) {
-        return placementDriver.awaitPrimaryReplica(partitionId, timestamp, 
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
+        return placementDriver.awaitPrimaryReplica(partitionId, timestamp, 
timeout, timeUnit)
                 .handle((primaryReplica, e) -> {
                     if (e != null) {
                         LOG.debug("Failed to retrieve primary replica for 
partition {}", partitionId, e);
@@ -121,7 +142,9 @@ public class PlacementDriverHelper {
      * @return A future that completes with a map of node to the partitions 
the node is primary for.
      */
     public CompletableFuture<Map<String, Set<ReplicationGroupId>>> 
awaitPrimaryReplicas(Collection<ReplicationGroupId> partitions) {
-        return computePrimaryReplicas(partitions, 
this::awaitPrimaryReplicaWithExceptionHandling)
+        BiFunction<ReplicationGroupId, HybridTimestamp, 
CompletableFuture<ReplicaMeta>> action = (groupId, timestamp)
+                -> awaitPrimaryReplicaWithExceptionHandling(groupId, 
timestamp, AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+        return computePrimaryReplicas(partitions, action)
                 .thenApply(partitionData -> partitionData.partitionsByNode);
     }
 


Reply via email to