This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bdbb88f97 IGNITE-20685 Implement ability to trigger transaction 
recovery (#2832)
8bdbb88f97 is described below

commit 8bdbb88f9751fe50ded7732ab3968ebb0bb219a1
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Mon Nov 20 13:46:50 2023 +0300

    IGNITE-20685 Implement ability to trigger transaction recovery (#2832)
---
 .../ignite/client/fakes/FakeInternalTable.java     |   3 +
 .../apache/ignite/client/fakes/FakeTxManager.java  |   4 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +-
 .../ignite/internal/table/ItTableScanTest.java     |  14 +-
 .../internal/table/ItTransactionConflictTest.java  | 161 ++++++++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +-
 .../sql/engine/exec/ScannableTableImpl.java        |  14 +-
 .../engine/exec/rel/ScannableTableSelfTest.java    |  33 +++-
 .../exec/rel/TableScanNodeExecutionTest.java       |  25 ++-
 .../ItInternalTableReadWriteScanTest.java          |   2 +-
 .../ignite/distributed/ItTablePersistenceTest.java |  16 +-
 ...xDistributedTestSingleNodeNoCleanupMessage.java |  10 +-
 .../ignite/distributed/ItTxStateLocalMapTest.java  |   5 +-
 .../rebalance/ItRebalanceDistributedTest.java      |   2 +-
 .../ignite/internal/table/ItColocationTest.java    |  15 +-
 .../ignite/internal/table/InternalTable.java       |   5 +
 .../table/distributed/raft/PartitionListener.java  |   3 +
 .../replication/request/CommittableTxRequest.java  |   8 +
 .../request/ReadWriteMultiRowPkReplicaRequest.java |   8 -
 .../request/ReadWriteMultiRowReplicaRequest.java   |   8 -
 .../ReadWriteSingleRowPkReplicaRequest.java        |   7 -
 .../request/ReadWriteSingleRowReplicaRequest.java  |   7 -
 .../request/ReadWriteSwapRowReplicaRequest.java    |   8 +-
 .../replicator/PartitionReplicaListener.java       |  54 +++++-
 .../distributed/storage/InternalTableImpl.java     |  25 +--
 .../replication/PartitionReplicaListenerTest.java  |  20 ++-
 .../apache/ignite/distributed/ItTxTestCluster.java |  15 +-
 .../ignite/internal/table/TxAbstractTest.java      |   7 +-
 .../table/impl/DummyInternalTableImpl.java         |  22 ++-
 .../org/apache/ignite/internal/tx/LockManager.java |   1 -
 .../org/apache/ignite/internal/tx/TxManager.java   |   3 +-
 .../org/apache/ignite/internal/tx/TxStateMeta.java |  25 ++-
 .../ignite/internal/tx/TxStateMetaFinishing.java   |   6 +-
 .../ignite/internal/tx/impl/OrphanDetector.java    | 184 +++++++++++++++++++++
 .../internal/tx/impl/ReadOnlyTransactionImpl.java  |   2 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  92 +++++------
 .../ignite/internal/tx/message/TxMessageGroup.java |   5 +
 .../internal/tx/message/TxRecoveryMessage.java}    |  23 ++-
 .../apache/ignite/internal/tx/TxManagerTest.java   |   2 +-
 39 files changed, 680 insertions(+), 168 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 02e382a279..0e580576cf 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -34,6 +34,7 @@ import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
@@ -358,6 +359,7 @@ public class FakeInternalTable implements InternalTable {
     public Publisher<BinaryRow> scan(
             int partId,
             UUID txId,
+            TablePartitionId commitPartition,
             PrimaryReplica recipient,
             @Nullable Integer indexId,
             @Nullable BinaryTuplePrefix lowerBound,
@@ -394,6 +396,7 @@ public class FakeInternalTable implements InternalTable {
     public Publisher<BinaryRow> lookup(
             int partId,
             UUID txId,
+            TablePartitionId commitPartition,
             PrimaryReplica recipient,
             int indexId,
             BinaryTuple key,
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 42a9b58f06..ef00ab2d48 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -145,8 +145,8 @@ public class FakeTxManager implements TxManager {
     }
 
     @Override
-    public void updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> 
updater) {
-
+    public TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta, 
TxStateMeta> updater) {
+        return null;
     }
 
     @Override
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index c5a535c5a0..e0d1d45d22 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -297,11 +297,11 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         ReplicaService replicaSvc = new 
ReplicaService(clusterSvc.messagingService(), hybridClock);
 
         var txManager = new TxManagerImpl(
+                clusterSvc,
                 replicaService,
                 lockManager,
                 hybridClock,
                 new TransactionIdGenerator(idx),
-                () -> clusterSvc.topologyService().localMember().id(),
                 placementDriver,
                 partitionIdleSafeTimePropagationPeriodMsSupplier
         );
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 2b67249267..1b846c0084 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -139,7 +139,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
                 tx1,
-                internalTable.scan(PART_ID, tx1.id(), recipient, 
sortedIndexId, null, null, 0, null)
+                internalTable.scan(PART_ID, tx1.id(), tx1.commitPartition(), 
recipient, sortedIndexId, null, null, 0, null)
         );
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -415,7 +415,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
                 tx,
-                internalTable.scan(PART_ID, tx.id(), recipient, sortedIndexId, 
null, null, 0, null)
+                internalTable.scan(PART_ID, tx.id(), tx.commitPartition(), 
recipient, sortedIndexId, null, null, 0, null)
         );
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -443,7 +443,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         Publisher<BinaryRow> publisher1 = new RollbackTxOnErrorPublisher<>(
                 tx,
-                internalTable.scan(PART_ID, tx.id(), recipient, sortedIndexId, 
null, null, 0, null)
+                internalTable.scan(PART_ID, tx.id(), tx.commitPartition(), 
recipient, sortedIndexId, null, null, 0, null)
         );
 
         assertEquals(scanAllRows(publisher1).size(), scannedRows.size());
@@ -474,6 +474,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
                 internalTable.scan(
                         PART_ID,
                         tx.id(),
+                        tx.commitPartition(),
                         recipient,
                         soredIndexId,
                         lowBound,
@@ -500,6 +501,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
                 internalTable.scan(
                         PART_ID,
                         tx.id(),
+                        tx.commitPartition(),
                         recipient,
                         soredIndexId,
                         lowBound,
@@ -554,7 +556,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
                 Publisher<BinaryRow> publisher = new 
RollbackTxOnErrorPublisher<>(
                         tx,
-                        internalTable.scan(PART_ID, tx.id(), recipient, 
sortedIndexId, null, null, 0, null)
+                        internalTable.scan(PART_ID, tx.id(), 
tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
                 );
 
                 // Non-thread-safe collection is fine, HB is guaranteed by 
"Thread#join" inside of "runRace".
@@ -580,7 +582,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
                 Publisher<BinaryRow> publisher1 = new 
RollbackTxOnErrorPublisher<>(
                         tx,
-                        internalTable.scan(PART_ID, tx.id(), recipient, 
sortedIndexId, null, null, 0, null)
+                        internalTable.scan(PART_ID, tx.id(), 
tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
                 );
 
                 assertEquals(scanAllRows(publisher1).size(), 
scannedRows.size());
@@ -657,7 +659,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
                 publisher = new RollbackTxOnErrorPublisher<>(
                         tx,
-                        internalTable.scan(PART_ID, tx.id(), recipient, 
sortedIndexId, null, null, 0, null)
+                        internalTable.scan(PART_ID, tx.id(), 
tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
                 );
             }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java
new file mode 100644
index 0000000000..9f25d1a8cd
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionConflictTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Abandoned transactions integration tests.
+ */
+public class ItTransactionConflictTest extends ClusterPerTestIntegrationTest {
+    /** Table name. */
+    private static final String TABLE_NAME = "test_table";
+
+    @BeforeEach
+    @Override
+    public void setup(TestInfo testInfo) throws Exception {
+        super.setup(testInfo);
+
+        String zoneSql = "create zone test_zone with partitions=1, replicas=3";
+        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) with primary_zone='TEST_ZONE'";
+
+        cluster.doInSession(0, session -> {
+            executeUpdate(zoneSql, session);
+            executeUpdate(sql, session);
+        });
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20773";)
+    public void test() throws Exception {
+        TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
+                tblReplicationGrp,
+                node(0).clock().now(),
+                10,
+                SECONDS
+        );
+
+        assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+
+        IgniteImpl commitPartNode = IntStream.range(0, 
initialNodes()).mapToObj(this::node).filter(n -> leaseholder.equals(n.name()))
+                .findFirst().get();
+
+        log.info("Transaction commit partition is determined [node={}].", 
commitPartNode.name());
+
+        IgniteImpl txCrdNode = IntStream.range(1, 
initialNodes()).mapToObj(this::node).filter(n -> !leaseholder.equals(n.name()))
+                .findFirst().get();
+
+        log.info("Transaction coordinator is chosen [node={}].", 
txCrdNode.name());
+
+        UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
+
+        CompletableFuture<UUID> recoveryTxMsgCaptureFut = new 
CompletableFuture<>();
+
+        commitPartNode.dropMessages((nodeName, msg) -> {
+            if (msg instanceof TxRecoveryMessage) {
+                var recoveryTxMsg = (TxRecoveryMessage) msg;
+
+                recoveryTxMsgCaptureFut.complete(recoveryTxMsg.txId());
+            }
+
+            return false;
+        });
+
+        runConflictedTransaction(node(0));
+
+        assertThat(recoveryTxMsgCaptureFut, willCompleteSuccessfully());
+
+        assertEquals(orphanTxId, recoveryTxMsgCaptureFut.join());
+    }
+
+    /**
+     * Runs a transaction that was expectedly finished with the lock conflict 
exception.
+     *
+     * @param node Transaction coordinator node.
+     */
+    private void runConflictedTransaction(IgniteImpl node) {
+        RecordView view = node.tables().table(TABLE_NAME).recordView();
+
+        try {
+            Transaction rwTx2 = node.transactions().begin();
+
+            view.upsert(rwTx2, Tuple.create().set("key", 42).set("val", 
"val2"));
+        } catch (Exception e) {
+            assertEquals(Transactions.ACQUIRE_LOCK_ERR, extractCodeFrom(e));
+
+            log.info("Expected lock conflict.", e);
+        }
+    }
+
+    /**
+     * Starts the transaction, takes a lock, and stops the transaction 
coordinator.
+     * The stopped node leaves the transaction in the pending state.
+     *
+     * @param node Transaction coordinator node.
+     * @return Transaction id.
+     * @throws InterruptedException If interrupted.
+     */
+    private UUID startTransactionAndStopNode(IgniteImpl node) throws 
InterruptedException {
+        RecordView view = node.tables().table(TABLE_NAME).recordView();
+
+        Transaction rwTx1 = node.transactions().begin();
+
+        view.upsert(rwTx1, Tuple.create().set("key", 42).set("val", "val1"));
+
+        String txCrdNodeId = node.id();
+
+        node.stop();
+
+        assertTrue(waitForCondition(
+                () -> node(0).clusterNodes().stream().filter(n -> 
txCrdNodeId.equals(n.id())).count() == 0,
+                10_000)
+        );
+        return ((InternalTransaction) rwTx1).id();
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 8c6d88a75b..b841b330f4 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -563,11 +563,11 @@ public class IgniteImpl implements Ignite {
 
         // TODO: IGNITE-19344 - use nodeId that is validated on join (and 
probably generated differently).
         txManager = new TxManagerImpl(
+                clusterSvc,
                 replicaSvc,
                 lockMgr,
                 clock,
                 new TransactionIdGenerator(() -> 
clusterSvc.nodeName().hashCode()),
-                () -> clusterSvc.topologyService().localMember().id(),
                 placementDriverMgr.placementDriver(),
                 partitionIdleSafeTimePropagationPeriodMsSupplier
         );
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
index e0477f4515..a68684eeec 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
@@ -67,7 +67,17 @@ public class ScannableTableImpl implements ScannableTable {
         } else {
             PrimaryReplica recipient = new PrimaryReplica(ctx.localNode(), 
partWithTerm.term());
 
-            pub = internalTable.scan(partWithTerm.partId(), txAttributes.id(), 
recipient, null, null, null, 0, null);
+            pub = internalTable.scan(
+                    partWithTerm.partId(),
+                    txAttributes.id(),
+                    txAttributes.commitPartition(),
+                    recipient,
+                    null,
+                    null,
+                    null,
+                    0,
+                    null
+            );
         }
 
         TableRowConverter rowConverter = 
converterFactory.create(requiredColumns);
@@ -126,6 +136,7 @@ public class ScannableTableImpl implements ScannableTable {
             pub = internalTable.scan(
                     partWithTerm.partId(),
                     txAttributes.id(),
+                    txAttributes.commitPartition(),
                     new PrimaryReplica(ctx.localNode(), partWithTerm.term()),
                     indexId,
                     lower,
@@ -177,6 +188,7 @@ public class ScannableTableImpl implements ScannableTable {
             pub = internalTable.lookup(
                     partWithTerm.partId(),
                     txAttributes.id(),
+                    txAttributes.commitPartition(),
                     new PrimaryReplica(ctx.localNode(), partWithTerm.term()),
                     indexId,
                     keyTuple,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 83f9a928cf..330353eaa9 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -54,6 +54,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -128,7 +129,17 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
         } else {
             ClusterNode clusterNode = tx.clusterNode();
 
-            verify(internalTable).scan(partitionId, tx.id(), new 
PrimaryReplica(clusterNode, term), null, null, null, 0, null);
+            verify(internalTable).scan(
+                    partitionId,
+                    tx.id(),
+                    tx.commitPartition(),
+                    new PrimaryReplica(clusterNode, term),
+                    null,
+                    null,
+                    null,
+                    0,
+                    null
+            );
         }
 
         data.sendRows();
@@ -207,6 +218,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
             verify(internalTable).scan(
                     eq(partitionId),
                     eq(tx.id()),
+                    eq(tx.commitPartition()),
                     eq(primaryReplica),
                     eq(indexId),
                     condition.lowerValue != null ? 
any(BinaryTuplePrefix.class) : isNull(),
@@ -279,6 +291,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
             verify(internalTable).scan(
                     eq(partitionId),
                     eq(tx.id()),
+                    eq(tx.commitPartition()),
                     eq(primaryReplica),
                     eq(indexId),
                     nullable(BinaryTuplePrefix.class),
@@ -396,6 +409,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
             verify(internalTable).scan(
                     eq(partitionId),
                     eq(tx.id()),
+                    eq(tx.commitPartition()),
                     eq(primaryReplica),
                     eq(indexId),
                     prefix.capture(),
@@ -447,6 +461,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
             verify(internalTable).lookup(
                     eq(partitionId),
                     eq(tx.id()),
+                    any(),
                     eq(primaryReplica),
                     eq(indexId),
                     any(BinaryTuple.class),
@@ -496,6 +511,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
             verify(internalTable).lookup(
                     eq(partitionId),
                     eq(tx.id()),
+                    any(),
                     eq(primaryReplica),
                     eq(indexId),
                     any(BinaryTuple.class),
@@ -568,8 +584,17 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
                 doAnswer(invocation -> input.publisher).when(internalTable)
                         .scan(anyInt(), any(HybridTimestamp.class), 
any(ClusterNode.class));
             } else {
-                doAnswer(invocation -> input.publisher).when(internalTable)
-                        .scan(anyInt(), any(UUID.class), 
any(PrimaryReplica.class), isNull(), isNull(), isNull(), eq(0), isNull());
+                doAnswer(invocation -> 
input.publisher).when(internalTable).scan(
+                        anyInt(),
+                        any(UUID.class),
+                        any(TablePartitionId.class),
+                        any(PrimaryReplica.class),
+                        isNull(),
+                        isNull(),
+                        isNull(),
+                        eq(0),
+                        isNull()
+                );
             }
 
             RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
@@ -600,6 +625,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
                 doAnswer(i -> input.publisher).when(internalTable).scan(
                         anyInt(),
                         any(UUID.class),
+                        any(TablePartitionId.class),
                         any(PrimaryReplica.class),
                         any(Integer.class),
                         nullable(BinaryTuplePrefix.class),
@@ -637,6 +663,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
                 doAnswer(i -> input.publisher).when(internalTable).lookup(
                         anyInt(),
                         any(UUID.class),
+                        any(TablePartitionId.class),
                         any(PrimaryReplica.class),
                         any(Integer.class),
                         nullable(BinaryTuple.class),
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 36829d6fe8..699eb60281 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static 
org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import java.util.BitSet;
@@ -65,6 +67,11 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -106,20 +113,30 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
 
         HybridTimestampTracker timestampTracker = new HybridTimestampTracker();
 
+        String leaseholder = "local";
+
+        TopologyService topologyService = mock(TopologyService.class);
+        when(topologyService.localMember()).thenReturn(
+                new ClusterNodeImpl(leaseholder, leaseholder, 
NetworkAddress.from("127.0.0.1:1111"))
+        );
+
+        ClusterService clusterService = mock(ClusterService.class);
+        
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+        when(clusterService.topologyService()).thenReturn(topologyService);
+
         for (int size : sizes) {
             log.info("Check: size=" + size);
 
             ReplicaService replicaSvc = mock(ReplicaService.class, 
RETURNS_DEEP_STUBS);
 
-            String leaseholder = "local";
-
             TxManagerImpl txManager = new TxManagerImpl(
+                    clusterService,
                     replicaSvc,
                     new HeapLockManager(),
                     new HybridClockImpl(),
                     new TransactionIdGenerator(0xdeadbeef),
-                    () -> leaseholder,
-                    new TestPlacementDriver(leaseholder, leaseholder)
+                    new TestPlacementDriver(leaseholder, leaseholder),
+                    () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
             );
 
             txManager.start();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 281926ef82..d355a632cf 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -48,7 +48,7 @@ public class ItInternalTableReadWriteScanTest extends 
ItAbstractInternalTableSca
 
         return new RollbackTxOnErrorPublisher<>(
                 tx,
-                internalTbl.scan(part, tx.id(), recipient, null, null, null, 
0, null)
+                internalTbl.scan(part, tx.id(), tx.commitPartition(), 
recipient, null, null, null, 0, null)
         );
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index c2f066a838..9359b32118 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.distributed;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
 import static 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.tablePartitionId;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -209,12 +210,13 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
         for (int i = 0; i < nodes(); i++) {
             if (!txManagers.containsKey(i)) {
                 TxManager txManager = new TxManagerImpl(
+                        service.clusterService(),
                         replicaService,
                         new HeapLockManager(),
                         hybridClock,
                         new TransactionIdGenerator(i),
-                        () -> NODE_ID,
-                        TEST_PLACEMENT_DRIVER
+                        TEST_PLACEMENT_DRIVER,
+                        () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
                 );
 
                 txManager.start();
@@ -225,12 +227,13 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
         }
 
         TxManager txManager = new TxManagerImpl(
+                service.clusterService(),
                 replicaService,
                 new HeapLockManager(),
                 hybridClock,
                 new TransactionIdGenerator(-1),
-                () -> NODE_ID,
-                TEST_PLACEMENT_DRIVER
+                TEST_PLACEMENT_DRIVER,
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
         );
 
         txManager.start();
@@ -480,12 +483,13 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
 
                     TxManager txManager = txManagers.computeIfAbsent(index, k 
-> {
                         TxManager txMgr = new TxManagerImpl(
+                                service,
                                 replicaService,
                                 new HeapLockManager(),
                                 hybridClock,
                                 new TransactionIdGenerator(index),
-                                () -> NODE_ID,
-                                TEST_PLACEMENT_DRIVER
+                                TEST_PLACEMENT_DRIVER,
+                                () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
                         );
                         txMgr.start();
                         closeables.add(txMgr::stop);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index cac9e1c778..8404846fa0 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.distributed;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -58,6 +59,7 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
 import org.junit.jupiter.api.BeforeEach;
@@ -95,19 +97,21 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends ItTxDistribut
         ) {
             @Override
             protected TxManagerImpl newTxManager(
+                    ClusterService clusterService,
                     ReplicaService replicaSvc,
                     HybridClock clock,
                     TransactionIdGenerator generator,
                     ClusterNode node,
                     PlacementDriver placementDriver
             ) {
-                return new TxManagerImpl(
+                return new  TxManagerImpl(
+                        clusterService,
                         replicaSvc,
                         new HeapLockManager(),
                         clock,
                         generator,
-                        node::id,
-                        placementDriver
+                        placementDriver,
+                        () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
                 ) {
                     @Override
                     public CompletableFuture<Void> 
executeCleanupAsync(Runnable runnable) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
index 7bb9e285f4..861189e381 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
@@ -134,13 +134,13 @@ public class ItTxStateLocalMapTest extends 
IgniteAbstractTest {
 
         ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl) 
testCluster.igniteTransactions().begin();
 
-        checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, 
coordinatorId, null), List.of(0));
+        checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, 
coordinatorId, tx.commitPartition(), null), List.of(0));
         checkLocalTxStateOnNodes(tx.id(), null, IntStream.range(1, 
NODES).boxed().collect(toList()));
 
         touchOp.accept(tx);
 
         if (checkAfterTouch) {
-            checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, 
coordinatorId, null));
+            checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, 
coordinatorId, tx.commitPartition(), null));
         }
 
         if (commit) {
@@ -154,6 +154,7 @@ public class ItTxStateLocalMapTest extends 
IgniteAbstractTest {
                 new TxStateMeta(
                         commit ? COMMITED : ABORTED,
                         coordinatorId,
+                        tx.commitPartition(),
                         commit ? testCluster.clocks.get(coord.name()).now() : 
null
                 )
         );
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index ca7d7c3f7d..219fe93464 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -907,11 +907,11 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
             );
 
             txManager = new TxManagerImpl(
+                    clusterService,
                     replicaSvc,
                     lockManager,
                     hybridClock,
                     new TransactionIdGenerator(addr.port()),
-                    () -> clusterService.topologyService().localMember().id(),
                     placementDriver,
                     partitionIdleSafeTimePropagationPeriodMsSupplier
             );
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 3e061ffe71..06a8447183 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toMap;
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static org.apache.ignite.internal.schema.SchemaTestUtils.specToType;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
@@ -28,6 +29,7 @@ import static 
org.junit.jupiter.params.ParameterizedTest.ARGUMENTS_PLACEHOLDER;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
@@ -95,6 +97,7 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -137,20 +140,22 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
 
     @BeforeAll
     static void beforeAllTests() {
-        ClusterService clusterService = Mockito.mock(ClusterService.class, 
RETURNS_DEEP_STUBS);
-        
when(clusterService.topologyService().localMember().address()).thenReturn(DummyInternalTableImpl.ADDR);
-
         ClusterNode clusterNode = DummyInternalTableImpl.LOCAL_NODE;
 
+        ClusterService clusterService = Mockito.mock(ClusterService.class, 
RETURNS_DEEP_STUBS);
+        
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+        
when(clusterService.topologyService().localMember()).thenReturn(clusterNode);
+
         ReplicaService replicaService = Mockito.mock(ReplicaService.class, 
RETURNS_DEEP_STUBS);
 
         txManager = new TxManagerImpl(
+                clusterService,
                 replicaService,
                 new HeapLockManager(),
                 new HybridClockImpl(),
                 new TransactionIdGenerator(0xdeadbeef),
-                clusterNode::id,
-                new TestPlacementDriver(clusterNode)
+                new TestPlacementDriver(clusterNode),
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
         ) {
             @Override
             public CompletableFuture<Void> finish(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 1b588578da..72b0942492 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
@@ -331,6 +332,7 @@ public interface InternalTable extends ManuallyCloseable {
      *
      * @param partId The partition.
      * @param txId Transaction id.
+     * @param commitPartition Commit partition id.
      * @param recipient Primary replica that will handle given get request.
      * @param lowerBound Lower search bound.
      * @param upperBound Upper search bound.
@@ -341,6 +343,7 @@ public interface InternalTable extends ManuallyCloseable {
     Publisher<BinaryRow> scan(
             int partId,
             UUID txId,
+            TablePartitionId commitPartition,
             PrimaryReplica recipient,
             @Nullable Integer indexId,
             @Nullable BinaryTuplePrefix lowerBound,
@@ -397,6 +400,7 @@ public interface InternalTable extends ManuallyCloseable {
      *
      * @param partId The partition.
      * @param txId Transaction id.
+     * @param commitPartition Commit partition id.
      * @param recipient Primary replica that will handle given get request.
      * @param indexId Index id.
      * @param key Key to search.
@@ -406,6 +410,7 @@ public interface InternalTable extends ManuallyCloseable {
     Publisher<BinaryRow> lookup(
             int partId,
             UUID txId,
+            TablePartitionId commitPartition,
             PrimaryReplica recipient,
             int indexId,
             BinaryTuple key,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 5c1710fa1e..38e23f85d9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -270,6 +270,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
             updateTrackerIgnoringTrackerClosedException(safeTime, 
cmd.safeTime());
         }
+
         replicaTouch(cmd.txId(), cmd.txCoordinatorId(), cmd.full() ? 
cmd.safeTime() : null, cmd.full());
     }
 
@@ -577,6 +578,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
         txManager.updateTxMeta(txId, old -> new TxStateMeta(
                 full ? COMMITED : PENDING,
                 txCoordinatorId,
+                old == null ? null : old.commitPartitionId(),
                 full ? commitTimestamp : null
         ));
     }
@@ -585,6 +587,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
         txManager.updateTxMeta(txId, old -> new TxStateMeta(
                 commit ? COMMITED : ABORTED,
                 txCoordinatorId,
+                old == null ? null : old.commitPartitionId(),
                 commit ? commitTimestamp : null
         ));
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
index 9d163ca11c..c90badb72b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.table.distributed.replication.request;
 
 import java.util.UUID;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 
 /**
  * Transaction request that can contain full transaction (transaction that 
contains full set of keys).
@@ -30,4 +31,11 @@ public interface CommittableTxRequest extends ReplicaRequest 
{
      * Return {@code true} if this is a full transaction.
      */
     boolean full();
+
+    /**
+     * Gets a commit partition id.
+     *
+     * @return Table partition id.
+     */
+    TablePartitionIdMessage commitPartitionId();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
index e5fca9fc41..32a7bbf109 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table.distributed.replication.request;
 
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -26,13 +25,6 @@ import org.apache.ignite.network.annotations.Transferable;
  */
 @Transferable(TableMessageGroup.RW_MULTI_ROW_PK_REPLICA_REQUEST)
 public interface ReadWriteMultiRowPkReplicaRequest extends 
MultipleRowPkReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
-    /**
-     * Gets a commit partition id.
-     *
-     * @return Table partition id.
-     */
-    TablePartitionIdMessage commitPartitionId();
-
     /**
      * Disable delayed ack optimization.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
index 0789ea568d..de3773252f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table.distributed.replication.request;
 
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -26,13 +25,6 @@ import org.apache.ignite.network.annotations.Transferable;
  */
 @Transferable(TableMessageGroup.RW_MULTI_ROW_REPLICA_REQUEST)
 public interface ReadWriteMultiRowReplicaRequest extends 
MultipleRowReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
-    /**
-     * Gets a commit partition id.
-     *
-     * @return Table partition id.
-     */
-    TablePartitionIdMessage commitPartitionId();
-
     /**
      * Disable delayed ack optimization.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
index 6447819dd7..896244e56a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table.distributed.replication.request;
 
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -26,10 +25,4 @@ import org.apache.ignite.network.annotations.Transferable;
  */
 @Transferable(TableMessageGroup.RW_SINGLE_ROW_PK_REPLICA_REQUEST)
 public interface ReadWriteSingleRowPkReplicaRequest extends 
SingleRowPkReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
-    /**
-     * Gets a commit partition id.
-     *
-     * @return Table partition id.
-     */
-    TablePartitionIdMessage commitPartitionId();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
index ce1e6a1b0e..6f62a42b9b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table.distributed.replication.request;
 
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -26,10 +25,4 @@ import org.apache.ignite.network.annotations.Transferable;
  */
 @Transferable(TableMessageGroup.RW_SINGLE_ROW_REPLICA_REQUEST)
 public interface ReadWriteSingleRowReplicaRequest extends 
SingleRowReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
-    /**
-     * Gets a commit partition id.
-     *
-     * @return Table partition id.
-     */
-    TablePartitionIdMessage commitPartitionId();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
index d276841d38..983505f75a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table.distributed.replication.request;
 
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -26,10 +25,5 @@ import org.apache.ignite.network.annotations.Transferable;
  */
 @Transferable(TableMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST)
 public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest, 
ReadWriteReplicaRequest, CommittableTxRequest {
-    /**
-     * Gets a commit partition id.
-     *
-     * @return Table partition id.
-     */
-    TablePartitionIdMessage commitPartitionId();
+
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 53e32760e8..2318f6081b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -158,6 +158,7 @@ import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.Cursor;
@@ -478,10 +479,19 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
             // Saving state is not needed for full transactions.
             if (!req.full()) {
-                txManager.updateTxMeta(req.transactionId(), old -> new 
TxStateMeta(PENDING, senderId, null));
+                txManager.updateTxMeta(req.transactionId(), old -> new 
TxStateMeta(
+                        PENDING,
+                        senderId,
+                        req.commitPartitionId().asTablePartitionId(),
+                        null
+                ));
             }
         }
 
+        if (request instanceof TxRecoveryMessage) {
+            return processTxRecoveryAction((TxRecoveryMessage) request);
+        }
+
         HybridTimestamp opTsIfDirectRo = (request instanceof 
ReadOnlyDirectReplicaRequest) ? hybridClock.now() : null;
 
         return validateTableExistence(request, opTsIfDirectRo)
@@ -490,6 +500,30 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .thenCompose(opStartTimestamp -> 
processOperationRequest(request, isPrimary, senderId, opTsIfDirectRo));
     }
 
+    /**
+     * Processes transaction recovery request on a commit partition.
+     *
+     * @param request Tx recovery request.
+     * @return The future is complete when the transaction state is finalized.
+     */
+    private CompletableFuture<Void> processTxRecoveryAction(TxRecoveryMessage 
request) {
+        UUID txId = request.txId();
+
+        TxMeta txMeta = txStateStorage.get(txId);
+
+        // Check whether a transaction has already been finished.
+        boolean transactionAlreadyFinished = txMeta != null && 
isFinalState(txMeta.txState());
+
+        if (transactionAlreadyFinished) {
+            return completedFuture(null);
+        }
+
+        LOG.info("Orphan transaction has to be aborted [tx={}].", txId);
+
+        // TODO: IGNITE-20735 Implement initiate recovery handling logic.
+        return completedFuture(null);
+    }
+
     /**
      * Validates that the table exists at a timestamp corresponding to the 
request operation.
      *
@@ -635,7 +669,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             // We treat SCAN as 2pc and only switch to a 1pc mode if all table 
rows fit in the bucket and the transaction is implicit.
             // See `req.full() && (err != null || rows.size() < 
req.batchSize())` condition.
             // If they don't fit the bucket, the transaction is treated as 2pc.
-            txManager.updateTxMeta(req.transactionId(), old -> new 
TxStateMeta(PENDING, senderId, null));
+            txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(
+                    PENDING,
+                    senderId,
+                    req.commitPartitionId().asTablePartitionId(),
+                    null
+            ));
 
             // Implicit RW scan can be committed locally on a last batch or 
error.
             return appendTxCommand(req.transactionId(), RequestType.RW_SCAN, 
false, () -> processScanRetrieveBatchAction(req, senderId))
@@ -1526,6 +1565,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     ) {
         CompletableFuture<?>[] futures = enlistedPartitions.stream()
                 .map(partitionId -> changeStateFuture.thenCompose(ignored ->
+                        // TODO: IGNITE-20874 Use the node cleanup procedure 
instead of the replication group cleanup one.
                         cleanupWithRetry(commit, commitTimestamp, txId, 
partitionId, attemptsToCleanupReplica)))
                 .toArray(size -> new CompletableFuture<?>[size]);
 
@@ -3313,6 +3353,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private CompletableFuture<Boolean> ensureReplicaIsPrimary(ReplicaRequest 
request) {
         Long expectedTerm;
 
+        // TODO: IGNITE-20875 Add enlistment consistency token to 
PrimaryReplicaTestRequest interface.
         if (request instanceof ReadWriteReplicaRequest) {
             expectedTerm = ((ReadWriteReplicaRequest) request).term();
 
@@ -3327,6 +3368,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             assert expectedTerm != null;
         } else if (request instanceof BuildIndexReplicaRequest) {
             expectedTerm = ((BuildIndexReplicaRequest) 
request).enlistmentConsistencyToken();
+        } else if (request instanceof TxRecoveryMessage) {
+            expectedTerm = ((TxRecoveryMessage) 
request).enlistmentConsistencyToken();
         } else {
             expectedTerm = null;
         }
@@ -3687,7 +3730,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         txManager.updateTxMeta(txId, old -> old == null
                 ? null
-                : new TxStateMeta(txState, old.txCoordinatorId(), txState == 
COMMITED ? commitTimestamp : null));
+                : new TxStateMeta(
+                        txState,
+                        old.txCoordinatorId(),
+                        old.commitPartitionId(),
+                        txState == COMMITED ? commitTimestamp : null
+                ));
     }
 
     // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code 
below
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 5a1eae715b..0f71220cc7 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -98,7 +98,6 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
-import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
 import 
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.SingleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.SwapRowReplicaRequest;
@@ -407,7 +406,7 @@ public class InternalTableImpl implements InternalTable {
 
         CompletableFuture<Collection<BinaryRow>> fut;
 
-        ReadWriteScanRetrieveBatchReplicaRequestBuilder requestBuilder = 
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
+        Function<Long, ReplicaRequest> mapFunc = (term) -> 
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
                 .groupId(partGroupId)
                 .timestampLong(clock.nowLong())
                 .transactionId(tx.id())
@@ -419,13 +418,15 @@ public class InternalTableImpl implements InternalTable {
                 .flags(flags)
                 .columnsToInclude(columnsToInclude)
                 .full(implicit) // Intent for one phase commit.
-                .batchSize(batchSize);
+                .batchSize(batchSize)
+                .term(term)
+                
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
+                .build();
 
         if (primaryReplicaAndTerm != null) {
-            ReadWriteScanRetrieveBatchReplicaRequest request = 
requestBuilder.term(primaryReplicaAndTerm.get2()).build();
-            fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
+            fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), 
mapFunc.apply(primaryReplicaAndTerm.get2()));
         } else {
-            fut = enlistWithRetry(tx, partId, term -> 
requestBuilder.term(term).build(), ATTEMPTS_TO_ENLIST_PARTITION, false, null);
+            fut = enlistWithRetry(tx, partId, mapFunc, 
ATTEMPTS_TO_ENLIST_PARTITION, false, null);
         }
 
         return postEnlist(fut, false, tx, false);
@@ -827,9 +828,7 @@ public class InternalTableImpl implements InternalTable {
         return enlistInTx(
                 keyRows,
                 tx,
-                    (keyRows0, txo, groupId, term, full) -> {
-                        return readWriteMultiRowPkReplicaRequest(RW_GET_ALL, 
keyRows0, txo, groupId, term, full);
-                    },
+                (keyRows0, txo, groupId, term, full) -> 
readWriteMultiRowPkReplicaRequest(RW_GET_ALL, keyRows0, txo, groupId, term, 
full),
                 InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder,
                 (res, req) -> false
         );
@@ -1265,12 +1264,13 @@ public class InternalTableImpl implements InternalTable 
{
     public Publisher<BinaryRow> lookup(
             int partId,
             UUID txId,
+            TablePartitionId commitPartition,
             PrimaryReplica recipient,
             int indexId,
             BinaryTuple key,
             @Nullable BitSet columnsToInclude
     ) {
-        return scan(partId, txId, recipient, indexId, key, null, null, 0, 
columnsToInclude);
+        return scan(partId, txId, commitPartition, recipient, indexId, key, 
null, null, 0, columnsToInclude);
     }
 
     @Override
@@ -1388,6 +1388,7 @@ public class InternalTableImpl implements InternalTable {
     public Publisher<BinaryRow> scan(
             int partId,
             UUID txId,
+            TablePartitionId commitPartition,
             PrimaryReplica recipient,
             @Nullable Integer indexId,
             @Nullable BinaryTuplePrefix lowerBound,
@@ -1395,12 +1396,13 @@ public class InternalTableImpl implements InternalTable 
{
             int flags,
             @Nullable BitSet columnsToInclude
     ) {
-        return scan(partId, txId, recipient, indexId, null, lowerBound, 
upperBound, flags, columnsToInclude);
+        return scan(partId, txId, commitPartition, recipient, indexId, null, 
lowerBound, upperBound, flags, columnsToInclude);
     }
 
     private Publisher<BinaryRow> scan(
             int partId,
             UUID txId,
+            TablePartitionId commitPartition,
             PrimaryReplica recipient,
             @Nullable Integer indexId,
             @Nullable BinaryTuple exactKey,
@@ -1427,6 +1429,7 @@ public class InternalTableImpl implements InternalTable {
                             .batchSize(batchSize)
                             .term(recipient.term())
                             .full(false) // Set explicitly.
+                            
.commitPartitionId(serializeTablePartitionId(commitPartition))
                             .build();
 
                     return replicaSvc.invoke(recipient.node(), request);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index dd830a0052..d0e38050d3 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -220,6 +220,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     private static final int TABLE_ID = 1;
 
+    private static final TablePartitionId commitPartitionId = new 
TablePartitionId(TABLE_ID, PART_ID);
+
     private static final int ANOTHER_TABLE_ID = 2;
 
     private final Map<UUID, Set<RowId>> pendingRows = new 
ConcurrentHashMap<>();
@@ -651,7 +653,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         pkStorage().put(testBinaryRow, rowId);
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
-        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, 
localNode.id(), clock.now()));
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, 
localNode.id(), commitPartitionId, clock.now()));
 
         CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
 
@@ -669,7 +671,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         pkStorage().put(testBinaryRow, rowId);
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
-        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, 
localNode.id(), null));
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, 
localNode.id(), commitPartitionId, null));
 
         CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
 
@@ -688,7 +690,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         pkStorage().put(testBinaryRow, rowId);
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
-        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED, 
localNode.id(), null));
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED, 
localNode.id(), commitPartitionId, null));
 
         CompletableFuture<ReplicaResult> fut = 
doReadOnlySingleGet(testBinaryKey);
 
@@ -728,6 +730,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .scanId(1L)
                         .indexToUse(sortedIndexId)
                         .batchSize(4)
+                        .commitPartitionId(commitPartitionId())
                         .build(), localNode.id());
 
         List<BinaryRow> rows = (List<BinaryRow>) fut.get(1, 
TimeUnit.SECONDS).result();
@@ -744,6 +747,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .scanId(1L)
                 .indexToUse(sortedIndexId)
                 .batchSize(4)
+                .commitPartitionId(commitPartitionId())
                 .build(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -763,6 +767,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .upperBoundPrefix(toIndexBound(3))
                 .flags(SortedIndexStorage.LESS_OR_EQUAL)
                 .batchSize(5)
+                .commitPartitionId(commitPartitionId())
                 .build(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -780,6 +785,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .indexToUse(sortedIndexId)
                 .lowerBoundPrefix(toIndexBound(5))
                 .batchSize(5)
+                .commitPartitionId(commitPartitionId())
                 .build(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -797,6 +803,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .indexToUse(sortedIndexId)
                 .exactKey(toIndexKey(0))
                 .batchSize(5)
+                .commitPartitionId(commitPartitionId())
                 .build(), localNode.id());
 
         rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1302,7 +1309,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
             // Imitation of tx commit.
             txStateStorage.put(txId, new TxMeta(TxState.COMMITED, new 
ArrayList<>(), now));
-            txManager.updateTxMeta(txId, old -> new 
TxStateMeta(TxState.COMMITED, UUID.randomUUID().toString(), now));
+            txManager.updateTxMeta(txId, old -> new 
TxStateMeta(TxState.COMMITED, UUID.randomUUID().toString(), commitPartitionId, 
now));
 
             CompletableFuture<?> replicaCleanupFut = 
partitionReplicaListener.invoke(
                     TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
@@ -1715,6 +1722,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                                 .term(1L)
                                 .scanId(1)
                                 .batchSize(100)
+                                .commitPartitionId(commitPartitionId())
                                 .build(),
                         localNode.id()
                 )
@@ -1732,6 +1740,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                                 .term(1L)
                                 .scanId(1)
                                 .batchSize(100)
+                                .commitPartitionId(commitPartitionId())
                                 .build(),
                         localNode.id()
                 )
@@ -1754,6 +1763,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         .scanId(1)
                         .batchSize(100)
                         .full(false)
+                        .commitPartitionId(commitPartitionId())
                         .build(),
                 localNode.id()
         );
@@ -2443,7 +2453,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private void cleanup(UUID txId) {
         HybridTimestamp commitTs = clock.now();
 
-        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, 
UUID.randomUUID().toString(), commitTs));
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, 
UUID.randomUUID().toString(), commitPartitionId, commitTs));
 
         partitionReplicaListener.invoke(
                 TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index cb1f5b3334..226f3c32c7 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -351,6 +352,7 @@ public class ItTxTestCluster {
             replicaServices.put(node.name(), replicaSvc);
 
             TxManagerImpl txMgr = newTxManager(
+                    cluster.get(i),
                     replicaSvc,
                     clock,
                     new TransactionIdGenerator(i),
@@ -384,6 +386,7 @@ public class ItTxTestCluster {
     }
 
     protected TxManagerImpl newTxManager(
+            ClusterService clusterService,
             ReplicaService replicaSvc,
             HybridClock clock,
             TransactionIdGenerator generator,
@@ -391,12 +394,13 @@ public class ItTxTestCluster {
             PlacementDriver placementDriver
     ) {
         return new TxManagerImpl(
+                clusterService,
                 replicaSvc,
                 new HeapLockManager(),
                 clock,
                 generator,
-                node::id,
-                placementDriver
+                placementDriver,
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
         );
     }
 
@@ -822,15 +826,14 @@ public class ItTxTestCluster {
     }
 
     private void initializeClientTxComponents() {
-        Supplier<String> localNodeIdSupplier = () -> 
client.topologyService().localMember().id();
-
         clientTxManager = new TxManagerImpl(
+                client,
                 clientReplicaSvc,
                 new HeapLockManager(),
                 clientClock,
                 new TransactionIdGenerator(-1),
-                localNodeIdSupplier,
-                placementDriver
+                placementDriver,
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
         );
 
         clientTxStateResolver = new TransactionStateResolver(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 52f8ea4a7a..4e2040f7b2 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -2004,7 +2004,12 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         UUID txId = ((ReadWriteTransactionImpl) tx).id();
 
         for (TxManager txManager : txManagers()) {
-            txManager.updateTxMeta(txId, old -> old == null ? null : new 
TxStateMeta(old.txState(), "restarted", old.commitTimestamp()));
+            txManager.updateTxMeta(txId, old -> old == null ? null : new 
TxStateMeta(
+                    old.txState(),
+                    "restarted",
+                    old.commitPartitionId(),
+                    old.commitTimestamp()
+            ));
         }
 
         // Read-only.
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index da2716615f..7c64a6eb5e 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -98,7 +99,10 @@ import 
org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import 
org.apache.ignite.internal.util.PendingIndependentComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 
@@ -433,14 +437,26 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
      * @param replicaSvc Replica service to use.
      */
     public static TxManagerImpl txManager(ReplicaService replicaSvc) {
-        return new TxManagerImpl(
+        TopologyService topologyService = mock(TopologyService.class);
+        when(topologyService.localMember()).thenReturn(LOCAL_NODE);
+
+        ClusterService clusterService = mock(ClusterService.class);
+        
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+        when(clusterService.topologyService()).thenReturn(topologyService);
+
+        var txManager = new TxManagerImpl(
+                clusterService,
                 replicaSvc,
                 new HeapLockManager(),
                 CLOCK,
                 new TransactionIdGenerator(0xdeadbeef),
-                LOCAL_NODE::id,
-                TEST_PLACEMENT_DRIVER
+                TEST_PLACEMENT_DRIVER,
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
         );
+
+        txManager.start();
+
+        return txManager;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
index 3ab221d854..79f8f3d987 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
@@ -25,7 +25,6 @@ import org.jetbrains.annotations.TestOnly;
 
 /** Lock manager allows to acquire locks and release locks and supports 
deadlock prevention by transaction id ordering. */
 public interface LockManager {
-
     /**
      * Attempts to acquire a lock for the specified {@code lockKey} in 
specified {@code lockMode}.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index b81708ff5a..1b0faeb812 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -70,8 +70,9 @@ public interface TxManager extends IgniteComponent {
      *
      * @param txId Transaction id.
      * @param updater Transaction meta updater.
+     * @return Updated transaction state.
      */
-    void updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> updater);
+    TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> 
updater);
 
     /**
      * Returns lock manager.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index df1e5f79ce..b0241b32fd 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.tx;
 
+import java.util.Objects;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -30,6 +33,9 @@ public class TxStateMeta implements TransactionMeta {
 
     private final String txCoordinatorId;
 
+    /** Identifier of the replication group that manages a transaction state. 
*/
+    private final TablePartitionId commitPartitionId;
+
     private final HybridTimestamp commitTimestamp;
 
     /**
@@ -37,15 +43,18 @@ public class TxStateMeta implements TransactionMeta {
      *
      * @param txState Transaction state.
      * @param txCoordinatorId Transaction coordinator id.
+     * @param commitPartitionId Commit partition replication group id.
      * @param commitTimestamp Commit timestamp.
      */
     public TxStateMeta(
             TxState txState,
             String txCoordinatorId,
+            @Nullable TablePartitionId commitPartitionId,
             @Nullable HybridTimestamp commitTimestamp
     ) {
         this.txState = txState;
         this.txCoordinatorId = txCoordinatorId;
+        this.commitPartitionId = commitPartitionId;
         this.commitTimestamp = commitTimestamp;
     }
 
@@ -58,6 +67,10 @@ public class TxStateMeta implements TransactionMeta {
         return txCoordinatorId;
     }
 
+    public TablePartitionId commitPartitionId() {
+        return commitPartitionId;
+    }
+
     @Override
     public @Nullable HybridTimestamp commitTimestamp() {
         return commitTimestamp;
@@ -80,19 +93,21 @@ public class TxStateMeta implements TransactionMeta {
         if (txCoordinatorId != null ? 
!txCoordinatorId.equals(that.txCoordinatorId) : that.txCoordinatorId != null) {
             return false;
         }
+
+        if (commitPartitionId != null ? 
!commitPartitionId.equals(that.commitPartitionId) : that.commitPartitionId != 
null) {
+            return false;
+        }
+
         return commitTimestamp != null ? 
commitTimestamp.equals(that.commitTimestamp) : that.commitTimestamp == null;
     }
 
     @Override
     public int hashCode() {
-        int result = txState != null ? txState.hashCode() : 0;
-        result = 31 * result + (txCoordinatorId != null ? 
txCoordinatorId.hashCode() : 0);
-        result = 31 * result + (commitTimestamp != null ? 
commitTimestamp.hashCode() : 0);
-        return result;
+        return Objects.hash(txState, txCoordinatorId, commitPartitionId, 
commitTimestamp);
     }
 
     @Override
     public String toString() {
-        return "[txState=" + txState + ", txCoordinatorId=" + txCoordinatorId 
+ ", commitTimestamp=" + commitTimestamp + ']';
+        return S.toString(TxStateMeta.class, this);
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
index 9b85925d7c..bc53742775 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.tx;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -35,9 +36,10 @@ public class TxStateMetaFinishing extends TxStateMeta {
      * Constructor.
      *
      * @param txCoordinatorId Transaction coordinator id.
+     * @param commitPartitionId Commit partition id.
      */
-    public TxStateMetaFinishing(String txCoordinatorId) {
-        super(TxState.FINISHING, txCoordinatorId, null);
+    public TxStateMetaFinishing(String txCoordinatorId, @Nullable 
TablePartitionId commitPartitionId) {
+        super(TxState.FINISHING, txCoordinatorId, commitPartitionId, null);
     }
 
     /**
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
new file mode 100644
index 0000000000..bce40cba30
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * The class detects transactions that are left without a coordinator but 
still hold locks. For that orphan transaction, the recovery
+ * message is sent to the commit partition replication group.
+ */
+public class OrphanDetector {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(OrphanDetector.class);
+
+    /** Tx messages factory. */
+    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
+    private static final long AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC = 10;
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Replica service. */
+    private final ReplicaService replicaService;
+
+    /** Placement driver. */
+    private final PlacementDriver placementDriver;
+
+    // TODO: IGNITE-20773 Uncomment this during implementation.
+    ///** Lock manager. */
+    //private final LockManager lockManager;
+
+    /** Hybrid clock. */
+    private final HybridClock clock;
+
+    /** Local transaction state storage. */
+    private Function<UUID, TxStateMeta> txLocalStateStorage;
+
+    /**
+     * The constructor.
+     *
+     * @param topologyService Topology service.
+     * @param replicaService Replica service.
+     * @param placementDriver Placement driver.
+     * @param clock Clock.
+     */
+    public OrphanDetector(
+            TopologyService topologyService,
+            ReplicaService replicaService,
+            PlacementDriver placementDriver,
+            //LockManager lockManager,
+            HybridClock clock) {
+        this.topologyService = topologyService;
+        this.replicaService = replicaService;
+        this.placementDriver = placementDriver;
+        //this.lockManager = lockManager;
+        this.clock = clock;
+    }
+
+    /**
+     * Starts the detector.
+     *
+     * @param txLocalStateStorage Local transaction state storage.
+     */
+    public void start(Function<UUID, TxStateMeta> txLocalStateStorage) {
+        this.txLocalStateStorage = txLocalStateStorage;
+        // TODO: IGNITE-20773 Subscribe to lock conflicts here.
+    }
+
+    /**
+     * Stops the detector.
+     */
+    public void stop() {
+        busyLock.block();
+        // TODO: IGNITE-20773 Unsubscribe from lock conflicts here.
+    }
+
+    /**
+     * Sends {@link TxRecoveryMessage} if the transaction is orphaned.
+     * TODO: IGNITE-20773 Invoke the method when the lock conflict is noted.
+     *
+     * @param txId Transaction id that holds a lock.
+     * @return Future to complete.
+     */
+    private CompletableFuture<Void> handleLockHolder(UUID txId) {
+        if (busyLock.enterBusy()) {
+            try {
+                return handleLockHolderInternal(txId);
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        return failedFuture(new NodeStoppingException());
+    }
+
+    /**
+     * Sends {@link TxRecoveryMessage} if the transaction is orphaned.
+     *
+     * @param txId Transaction id that holds a lock.
+     * @return Future to complete.
+     */
+    private CompletableFuture<Void> handleLockHolderInternal(UUID txId) {
+        TxStateMeta txState = txLocalStateStorage.apply(txId);
+
+        assert txState != null : "The transaction is undefined in the local 
node [txId=" + txId + "].";
+
+        if (txState.txState() == TxState.ABANDONED
+                || isFinalState(txState.txState())
+                || topologyService.getById(txState.txCoordinatorId()) != null) 
{
+            return completedFuture(null);
+        }
+
+        LOG.info(
+                "Conflict was found, and the coordinator of the transaction 
that holds a lock is not available "
+                        + "[txId={}, txCrd={}].",
+                txId,
+                txState.txCoordinatorId()
+        );
+
+        return placementDriver.awaitPrimaryReplica(
+                txState.commitPartitionId(),
+                clock.now(),
+                AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC,
+                SECONDS
+        ).thenCompose(replicaMeta -> {
+            ClusterNode commitPartPrimaryNode = 
topologyService.getByConsistentId(replicaMeta.getLeaseholder());
+
+            if (commitPartPrimaryNode == null) {
+                LOG.warn(
+                        "The primary replica of the commit partition is not 
available [commitPartGrp={}, tx={}]",
+                        txState.commitPartitionId(),
+                        txId
+                );
+
+                return completedFuture(null);
+            }
+
+            return replicaService.invoke(commitPartPrimaryNode, 
FACTORY.txRecoveryMessage()
+                    .groupId(txState.commitPartitionId())
+                    
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
+                    .txId(txId)
+                    .build());
+        });
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 2846d7b43d..d367396ec6 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -116,7 +116,7 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
         return ((TxManagerImpl) 
txManager).completeReadOnlyTransactionFuture(new 
TxIdAndTimestamp(readTimestamp, id()))
                 .thenRun(() -> txManager.updateTxMeta(
                         id(),
-                        old -> new TxStateMeta(COMMITED, 
old.txCoordinatorId(), old.commitTimestamp())
+                        old -> new TxStateMeta(COMMITED, 
old.txCoordinatorId(), old.commitPartitionId(), old.commitTimestamp())
                 ));
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 1d0c6f0b26..cbf750efdd 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -23,7 +23,6 @@ import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
-import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITED;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
@@ -89,7 +88,7 @@ import 
org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NetworkMessageHandler;
 import org.apache.ignite.tx.TransactionException;
@@ -146,8 +145,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     /** Lock to update and read the low watermark. */
     private final ReadWriteLock lowWatermarkReadWriteLock = new 
ReentrantReadWriteLock();
 
-    private final Lazy<String> localNodeId;
-
     private final PlacementDriver placementDriver;
 
     private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
@@ -158,48 +155,32 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /**
-     * The constructor.
-     *
-     * @param replicaService Replica service.
-     * @param lockManager Lock manager.
-     * @param clock A hybrid logical clock.
-     * @param transactionIdGenerator Used to generate transaction IDs.
-     */
-    public TxManagerImpl(
-            ReplicaService replicaService,
-            LockManager lockManager,
-            HybridClock clock,
-            TransactionIdGenerator transactionIdGenerator,
-            Supplier<String> localNodeIdSupplier,
-            PlacementDriver placementDriver
-    ) {
-        this(
-                replicaService,
-                lockManager,
-                clock,
-                transactionIdGenerator,
-                localNodeIdSupplier,
-                placementDriver,
-                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
-        );
-    }
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /** Detector of transactions that lost the coordinator. */
+    private final OrphanDetector orphanDetector;
+
+    /** Local node network identity. This id is available only after the 
network has started. */
+    private String localNodeId;
 
     /**
      * The constructor.
      *
+     * @param clusterService Cluster service.
      * @param replicaService Replica service.
      * @param lockManager Lock manager.
      * @param clock A hybrid logical clock.
      * @param transactionIdGenerator Used to generate transaction IDs.
+     * @param placementDriver Placement driver.
      * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe 
time propagation period in ms.
      */
     public TxManagerImpl(
+            ClusterService clusterService,
             ReplicaService replicaService,
             LockManager lockManager,
             HybridClock clock,
             TransactionIdGenerator transactionIdGenerator,
-            Supplier<String> localNodeIdSupplier,
             PlacementDriver placementDriver,
             LongSupplier idleSafeTimePropagationPeriodMsSupplier
     ) {
@@ -207,7 +188,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         this.lockManager = lockManager;
         this.clock = clock;
         this.transactionIdGenerator = transactionIdGenerator;
-        this.localNodeId = new Lazy<>(localNodeIdSupplier);
+        this.clusterService = clusterService;
         this.placementDriver = placementDriver;
         this.idleSafeTimePropagationPeriodMsSupplier = 
idleSafeTimePropagationPeriodMsSupplier;
 
@@ -220,6 +201,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(),
                 new NamedThreadFactory("tx-async-cleanup", LOG));
+
+        orphanDetector = new OrphanDetector(clusterService.topologyService(), 
replicaService, placementDriver, /*lockManager,*/ clock);
     }
 
     @Override
@@ -231,7 +214,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     public InternalTransaction begin(HybridTimestampTracker timestampTracker, 
boolean readOnly) {
         HybridTimestamp beginTimestamp = clock.now();
         UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp);
-        updateTxMeta(txId, old -> new TxStateMeta(PENDING, coordinatorId(), 
null));
+        updateTxMeta(txId, old -> new TxStateMeta(PENDING, localNodeId, null, 
null));
 
         if (!readOnly) {
             return new ReadWriteTransactionImpl(this, timestampTracker, txId);
@@ -289,8 +272,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     }
 
     @Override
-    public void updateTxMeta(UUID txId, Function<TxStateMeta, TxStateMeta> 
updater) {
-        txStateMap.compute(txId, (k, oldMeta) -> {
+    public TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta, 
TxStateMeta> updater) {
+        return txStateMap.compute(txId, (k, oldMeta) -> {
             TxStateMeta newMeta = updater.apply(oldMeta);
 
             if (newMeta == null) {
@@ -315,17 +298,13 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             finalState = ABORTED;
         }
 
-        updateTxMeta(txId, old -> new TxStateMeta(finalState, 
old.txCoordinatorId(), old.commitTimestamp()));
+        updateTxMeta(txId, old -> new TxStateMeta(finalState, 
old.txCoordinatorId(), old.commitPartitionId(), old.commitTimestamp()));
     }
 
     private @Nullable HybridTimestamp commitTimestamp(boolean commit) {
         return commit ? clock.now() : null;
     }
 
-    private String coordinatorId() {
-        return localNodeId.get();
-    }
-
     @Override
     public CompletableFuture<Void> finish(
             HybridTimestampTracker observableTimestampTracker,
@@ -338,7 +317,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
         if (enlistedGroups.isEmpty()) {
             // If there are no enlisted groups, just update local state - we 
already marked the tx as finished.
-            updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit, 
commitTimestamp(commit)));
+            updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit, 
commitPartition, commitTimestamp(commit)));
 
             return completedFuture(null);
         }
@@ -349,9 +328,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         // than all the read timestamps processed before.
         // Every concurrent operation will now use a finish future from the 
finishing state meta and get only final transaction
         // state after the transaction is finished.
-        TxStateMetaFinishing finishingStateMeta = new 
TxStateMetaFinishing(coordinatorId());
-
-        updateTxMeta(txId, old -> finishingStateMeta);
+        TxStateMetaFinishing finishingStateMeta = (TxStateMetaFinishing) 
updateTxMeta(txId, old ->
+                new TxStateMetaFinishing(localNodeId, old == null ? null : 
old.commitPartitionId()));
 
         AtomicBoolean performingFinish = new AtomicBoolean();
         TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
@@ -455,7 +433,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
                             if (transactionException.code() == 
TX_WAS_ABORTED_ERR) {
                                 updateTxMeta(txId, old -> {
-                                    TxStateMeta txStateMeta = new 
TxStateMeta(ABORTED, old.txCoordinatorId(), null);
+                                    TxStateMeta txStateMeta = new 
TxStateMeta(ABORTED, old.txCoordinatorId(), commitPartition, null);
 
                                     txFinishFuture.complete(txStateMeta);
 
@@ -526,7 +504,11 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
                         assert old instanceof TxStateMetaFinishing;
 
-                        TxStateMeta finalTxStateMeta = 
coordinatorFinalTxStateMeta(commit, commitTimestamp);
+                        TxStateMeta finalTxStateMeta = 
coordinatorFinalTxStateMeta(
+                                commit,
+                                old.commitPartitionId(),
+                                commitTimestamp
+                        );
 
                         txFinishFuture.complete(finalTxStateMeta);
 
@@ -590,7 +572,14 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
     @Override
     public void start() {
-        
replicaService.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
this);
+        localNodeId = clusterService.topologyService().localMember().id();
+        
clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class, 
this);
+        orphanDetector.start(txStateMap::get);
+    }
+
+    @Override
+    public void beforeNodeStop() {
+        orphanDetector.stop();
     }
 
     @Override
@@ -718,11 +707,16 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
      * Creates final {@link TxStateMeta} for coordinator node.
      *
      * @param commit Commit flag.
+     * @param commitPartitionId Commit partition id.
      * @param commitTimestamp Commit timestamp.
      * @return Transaction meta.
      */
-    private TxStateMeta coordinatorFinalTxStateMeta(boolean commit, @Nullable 
HybridTimestamp commitTimestamp) {
-        return new TxStateMeta(commit ? COMMITED : ABORTED, coordinatorId(), 
commitTimestamp);
+    private TxStateMeta coordinatorFinalTxStateMeta(
+            boolean commit,
+            TablePartitionId commitPartitionId,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        return new TxStateMeta(commit ? COMMITED : ABORTED, localNodeId, 
commitPartitionId, commitTimestamp);
     }
 
     /**
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index 00abd58a1b..2aed7d0c01 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -53,4 +53,9 @@ public class TxMessageGroup {
      * Message type for {@link TxStateResponse}.
      */
     public static final short TX_STATE_RESPONSE = 5;
+
+    /**
+     * Message type for {@link TxRecoveryMessage}.
+     */
+    public static final short TX_RECOVERY_MSG = 6;
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxRecoveryMessage.java
similarity index 57%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxRecoveryMessage.java
index 9d163ca11c..6f36fd8d61 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxRecoveryMessage.java
@@ -15,19 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.replication.request;
+package org.apache.ignite.internal.tx.message;
 
 import java.util.UUID;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Transaction request that can contain full transaction (transaction that 
contains full set of keys).
+ * Transaction recovery message.
  */
-public interface CommittableTxRequest extends ReplicaRequest {
-    UUID transactionId();
+@Transferable(TxMessageGroup.TX_RECOVERY_MSG)
+public interface TxRecoveryMessage extends PrimaryReplicaRequest, 
ReplicaRequest {
+    /**
+     * Gets a transaction id to resolve.
+     *
+     * @return Transaction id.
+     */
+    UUID txId();
 
     /**
-     * Return {@code true} if this is a full transaction.
+     * Gets an enlistment consistency token.
+     * The token is used to check that the lease is still actual while the 
message goes to the replica.
+     *
+     * @return Enlistment consistency token.
      */
-    boolean full();
+    long enlistmentConsistencyToken();
 }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 1b2dde012a..8f7b4ce5e1 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -114,11 +114,11 @@ public class TxManagerTest extends IgniteAbstractTest {
         when(replicaService.invoke(anyString(), 
any())).thenReturn(CompletableFuture.completedFuture(null));
 
         txManager = new TxManagerImpl(
+                clusterService,
                 replicaService,
                 new HeapLockManager(),
                 clock,
                 new TransactionIdGenerator(0xdeadbeef),
-                LOCAL_NODE::id,
                 placementDriver,
                 idleSafeTimePropagationPeriodMsSupplier
         );

Reply via email to