This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f034933e63 IGNITE-28030 Fix massive exceptions "Failed to abort on 
coordinator a transaction that lost its primary replica's volatile state" on 
server nodes (#7691)
6f034933e63 is described below

commit 6f034933e6357c8f76661e836794d16df899ea63
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Mar 9 13:46:08 2026 +0200

    IGNITE-28030 Fix massive exceptions "Failed to abort on coordinator a 
transaction that lost its primary replica's volatile state" on server nodes 
(#7691)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |  12 --
 .../rebalance/ItRebalanceDistributedTest.java      |   1 +
 .../internal/index/IndexBuildingManager.java       |   7 +-
 .../partition/replicator/fixtures/Node.java        |   1 +
 .../PartitionReplicaLifecycleManager.java          |  18 +-
 .../partition/replicator/TxRecoveryEngine.java     |  93 -----------
 .../replicator/ZonePartitionReplicaListener.java   |  34 ++--
 .../handlers/TxRecoveryMessageHandler.java         |  18 +-
 ...xStateCommitPartitionReplicaRequestHandler.java |  36 ++--
 .../PartitionReplicaLifecycleManagerTest.java      |   1 +
 .../runner/app/ItIgniteNodeRestartTest.java        |   1 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   1 +
 .../replicator/PartitionReplicaListener.java       |  23 +--
 .../distributed/TableManagerRecoveryTest.java      |   1 +
 .../replication/PartitionReplicaListenerTest.java  |   6 +-
 .../ZonePartitionReplicaListenerTest.java          |  14 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |  28 +++-
 .../ignite/internal/table/TxAbstractTest.java      |  25 +--
 .../table/impl/DummyInternalTableImpl.java         |   9 +-
 ...riteIntentResolutionWhenPrimaryExpiredTest.java |  26 ++-
 .../org/apache/ignite/internal/tx/TxManager.java   |   6 -
 .../org/apache/ignite/internal/tx/TxState.java     |   7 +-
 .../org/apache/ignite/internal/tx/TxStateMeta.java |  22 ++-
 .../ignite/internal/tx/TxStateMetaUnknown.java     |   1 +
 .../internal/tx/impl/PlacementDriverHelper.java    |  14 ++
 .../internal/tx/impl/TransactionInflights.java     |   2 +-
 .../internal/tx/impl/TransactionStateResolver.java | 113 ++++++++++---
 .../internal/tx/impl/TxCleanupRequestSender.java   |  23 ++-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  38 -----
 .../ignite/internal/tx/impl/TxMessageSender.java   |   2 +
 .../ignite/internal/tx/impl/TxRecoveryEngine.java  | 183 +++++++++++++++++++++
 .../tx/message/TxStateCoordinatorRequest.java      |   7 +
 .../tx/impl/TransactionStateResolverTest.java      |  12 +-
 33 files changed, 516 insertions(+), 269 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 0895d2a54a3..e9834d33218 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.client.fakes;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.Collection;
@@ -39,7 +38,6 @@ import org.apache.ignite.internal.tx.InternalTxOptions;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.PartitionEnlistment;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
-import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
@@ -212,16 +210,6 @@ public class FakeTxManager implements TxManager {
         return null;
     }
 
-    @Override
-    public CompletableFuture<@Nullable TransactionMeta> 
checkEnlistedPartitionsAndAbortIfNeeded(
-            TxStateMeta txMeta,
-            InternalTransaction tx,
-            long currentEnlistmentConsistencyToken,
-            ZonePartitionId senderGroupId
-    ) {
-        return completedFuture(stateMeta(tx.id()));
-    }
-
     @Override
     public <T extends TxStateMeta> T updateTxMeta(UUID txId, 
Function<TxStateMeta, TxStateMeta> updater) {
         return null;
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index c9907b58303..a2f4a1d4537 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1549,6 +1549,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     threadPoolsManager.tableIoExecutor(),
                     threadPoolsManager.rebalanceScheduler(),
                     threadPoolsManager.partitionOperationsExecutor(),
+                    threadPoolsManager.commonScheduler(),
                     clockService,
                     placementDriver,
                     schemaSyncService,
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
index 6c640ae740a..61f26216e67 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
@@ -54,7 +54,9 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.Lazy;
 
 /**
  * Component is responsible for building indexes and making them {@link 
CatalogIndexStatus#AVAILABLE available}. Both in a running cluster
@@ -120,7 +122,10 @@ public class IndexBuildingManager implements 
IgniteComponent {
                 clusterService.topologyService(),
                 clusterService.messagingService(),
                 new ExecutorInclinedPlacementDriver(placementDriver, executor),
-                new TxMessageSender(clusterService.messagingService(), 
replicaService, clockService)
+                new TxMessageSender(clusterService.messagingService(), 
replicaService, clockService),
+                new TxRecoveryEngine(txManager, 
clusterService.topologyService()),
+                new Lazy<>(() -> 
clusterService.topologyService().localMember()),
+                executor
         );
 
         indexBuilder = new IndexBuilder(
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index e5293fd5b42..9339fd161a3 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -742,6 +742,7 @@ public class Node {
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.rebalanceScheduler(),
                 threadPoolsManager.partitionOperationsExecutor(),
+                threadPoolsManager.commonScheduler(),
                 clockService,
                 placementDriverManager.placementDriver(),
                 schemaSyncService,
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index e66bdea7289..d4e366fb5f0 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -180,12 +180,14 @@ import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import 
org.apache.ignite.internal.tx.storage.state.TxStateStorageRebalanceException;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.LongPriorityQueue;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.SafeTimeValuesTracker;
@@ -287,6 +289,8 @@ public class PartitionReplicaLifecycleManager extends
 
     private final TxMessageSender txMessageSender;
 
+    private final TxRecoveryEngine txRecoveryEngine;
+
     private final EventListener<CreateZoneEventParameters> 
onCreateZoneListener = this::onCreateZone;
     private final EventListener<PrimaryReplicaEventParameters> 
onPrimaryReplicaExpiredListener = this::onPrimaryReplicaExpired;
     private final EventListener<DropZoneEventParameters> onZoneDropListener = 
fromConsumer(this::onZoneDrop);
@@ -312,6 +316,7 @@ public class PartitionReplicaLifecycleManager extends
      * @param rebalanceScheduler Executor for scheduling rebalance routine.
      * @param partitionOperationsExecutor Striped executor on which partition 
operations (potentially requiring I/O with storages)
      *         will be executed.
+     * @param commonExecutor Common executor.
      * @param clockService Clock service.
      * @param placementDriver Placement driver.
      * @param schemaSyncService Schema synchronization service.
@@ -336,6 +341,7 @@ public class PartitionReplicaLifecycleManager extends
             ExecutorService ioExecutor,
             ScheduledExecutorService rebalanceScheduler,
             Executor partitionOperationsExecutor,
+            Executor commonExecutor,
             ClockService clockService,
             PlacementDriver placementDriver,
             SchemaSyncService schemaSyncService,
@@ -360,6 +366,7 @@ public class PartitionReplicaLifecycleManager extends
                 ioExecutor,
                 rebalanceScheduler,
                 partitionOperationsExecutor,
+                commonExecutor,
                 clockService,
                 placementDriver,
                 schemaSyncService,
@@ -395,6 +402,7 @@ public class PartitionReplicaLifecycleManager extends
             ExecutorService ioExecutor,
             ScheduledExecutorService rebalanceScheduler,
             Executor partitionOperationsExecutor,
+            Executor commonExecutor,
             ClockService clockService,
             PlacementDriver placementDriver,
             SchemaSyncService schemaSyncService,
@@ -434,6 +442,8 @@ public class PartitionReplicaLifecycleManager extends
                 Integer::parseInt
         );
 
+        txRecoveryEngine = new TxRecoveryEngine(txManager, topologyService);
+
         txMessageSender = new TxMessageSender(
                 messagingService,
                 replicaService,
@@ -446,7 +456,10 @@ public class PartitionReplicaLifecycleManager extends
                 topologyService,
                 messagingService,
                 executorInclinedPlacementDriver,
-                txMessageSender
+                txMessageSender,
+                txRecoveryEngine,
+                new Lazy<>(topologyService::localMember),
+                commonExecutor
         );
 
         pendingAssignmentsRebalanceListener = 
createPendingAssignmentsRebalanceListener();
@@ -838,7 +851,8 @@ public class PartitionReplicaLifecycleManager extends
                                                 topologyService.localMember(),
                                                 zonePartitionId,
                                                 transactionStateResolver,
-                                                txMessageSender
+                                                txMessageSender,
+                                                txRecoveryEngine
                                         );
 
                                         
zoneResources.replicaListenerFuture().complete(replicaListener);
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
deleted file mode 100644
index 87706ef4cc1..00000000000
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.partition.replicator;
-
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import org.apache.ignite.internal.hlc.HybridTimestampTracker;
-import org.apache.ignite.internal.network.ClusterNodeResolver;
-import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
-import org.apache.ignite.internal.tx.TxManager;
-
-/**
- * Transaction recovery logic.
- */
-public class TxRecoveryEngine {
-    private final TxManager txManager;
-    private final ClusterNodeResolver clusterNodeResolver;
-
-    private final ZonePartitionId replicationGroupId;
-    private final Function<InternalClusterNode, PendingTxPartitionEnlistment> 
abandonedTxRecoveryEnlistmentFactory;
-
-    /** Constructor. */
-    public TxRecoveryEngine(
-            TxManager txManager,
-            ClusterNodeResolver clusterNodeResolver,
-            ZonePartitionId replicationGroupId,
-            Function<InternalClusterNode, PendingTxPartitionEnlistment> 
abandonedTxRecoveryEnlistmentFactory
-    ) {
-        this.txManager = txManager;
-        this.clusterNodeResolver = clusterNodeResolver;
-        this.replicationGroupId = replicationGroupId;
-        this.abandonedTxRecoveryEnlistmentFactory = 
abandonedTxRecoveryEnlistmentFactory;
-    }
-
-    /**
-     * Abort the abandoned transaction.
-     *
-     * @param txId Transaction id.
-     * @param senderId Sender inconsistent id.
-     */
-    public CompletableFuture<Void> triggerTxRecovery(UUID txId, UUID senderId) 
{
-        // If the transaction state is pending, then the transaction should be 
rolled back,
-        // meaning that the state is changed to aborted and a corresponding 
cleanup request
-        // is sent in a common durable manner to a partition that has 
initiated recovery.
-        return txManager.finish(
-                        HybridTimestampTracker.emptyTracker(),
-                        // Tx recovery is executed on the commit partition.
-                        replicationGroupId,
-                        false,
-                        false,
-                        true,
-                        false,
-                        Map.of(replicationGroupId, 
abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId))),
-                        txId
-                )
-                .whenComplete((v, ex) -> runCleanupOnNode(replicationGroupId, 
txId, senderId));
-    }
-
-    /**
-     * Run cleanup on a node.
-     *
-     * @param commitPartitionId Commit partition id.
-     * @param txId Transaction id.
-     * @param nodeId Node id (inconsistent).
-     */
-    public CompletableFuture<Void> runCleanupOnNode(ZonePartitionId 
commitPartitionId, UUID txId, UUID nodeId) {
-        // Get node id of the sender to send back cleanup requests.
-        String nodeConsistentId = 
clusterNodeResolver.getConsistentIdById(nodeId);
-
-        return nodeConsistentId == null ? nullCompletedFuture() : 
txManager.cleanup(commitPartitionId, nodeConsistentId, txId);
-    }
-}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 845a974e472..39fc89a5fde 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -54,10 +54,10 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.replicator.message.TableAware;
 import org.apache.ignite.internal.schema.SchemaSyncService;
-import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
 import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
@@ -120,7 +120,8 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
             InternalClusterNode localNode,
             ZonePartitionId replicationGroupId,
             TransactionStateResolver transactionStateResolver,
-            TxMessageSender txMessageSender
+            TxMessageSender txMessageSender,
+            TxRecoveryEngine txRecoveryEngine
     ) {
         this.raftClient = raftClient;
         this.failureProcessor = failureProcessor;
@@ -143,13 +144,6 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
 
         ReplicationRaftCommandApplicator raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftClient, replicationGroupId);
 
-        TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(
-                txManager,
-                clusterNodeResolver,
-                replicationGroupId,
-                
ZonePartitionReplicaListener::createAbandonedTxRecoveryEnlistment
-        );
-
         // Request handlers initialization.
 
         txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler(
@@ -177,12 +171,19 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
                 txStatePartitionStorage,
                 txManager,
                 clusterNodeResolver,
-                localNode,
                 txRecoveryEngine,
-                txMessageSender
+                txMessageSender,
+                replicationGroupId,
+                localNode
         );
 
-        txRecoveryMessageHandler = new 
TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, 
txRecoveryEngine, txManager);
+        txRecoveryMessageHandler = new TxRecoveryMessageHandler(
+                txStatePartitionStorage,
+                replicationGroupId,
+                txRecoveryEngine,
+                txManager,
+                localNode
+        );
 
         txCleanupRecoveryRequestHandler = new TxCleanupRecoveryRequestHandler(
                 txStatePartitionStorage,
@@ -201,13 +202,6 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
         replicaSafeTimeSyncRequestHandler = new 
ReplicaSafeTimeSyncRequestHandler(clockService, raftCommandApplicator);
     }
 
-    private static PendingTxPartitionEnlistment 
createAbandonedTxRecoveryEnlistment(InternalClusterNode node) {
-        // Enlistment consistency token is not required for the rollback, so 
it is 0L.
-        // Passing an empty set of table IDs as we don't know which tables 
were enlisted; this is ok as the corresponding write intents
-        // can still be resolved later when reads stumble upon them.
-        return new PendingTxPartitionEnlistment(node.name(), 0L);
-    }
-
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
         return replicaPrimacyEngine.validatePrimacy(request)
@@ -237,7 +231,7 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
         } else if (request instanceof WriteIntentSwitchReplicaRequest) {
             return 
writeIntentSwitchRequestHandler.handle((WriteIntentSwitchReplicaRequest) 
request, senderId);
         } else if (request instanceof TxStateCommitPartitionRequest) {
-            return 
txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest)
 request);
+            return 
txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest)
 request, senderId);
         } else if (request instanceof TxRecoveryMessage) {
             return txRecoveryMessageHandler.handle((TxRecoveryMessage) 
request, senderId);
         } else if (request instanceof TxCleanupRecoveryRequest) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
index 68662c72f32..fce8c788686 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
@@ -23,11 +23,12 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
+import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tx.TransactionLogUtils;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
 import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 
@@ -41,18 +42,21 @@ public class TxRecoveryMessageHandler {
     private final ZonePartitionId replicationGroupId;
     private final TxRecoveryEngine txRecoveryEngine;
     private final TxManager txManager;
+    private final InternalClusterNode localNode;
 
     /** Constructor. */
     public TxRecoveryMessageHandler(
             TxStatePartitionStorage txStatePartitionStorage,
             ZonePartitionId replicationGroupId,
             TxRecoveryEngine txRecoveryEngine,
-            TxManager txManager
+            TxManager txManager,
+            InternalClusterNode localNode
     ) {
         this.txStatePartitionStorage = txStatePartitionStorage;
         this.replicationGroupId = replicationGroupId;
         this.txRecoveryEngine = txRecoveryEngine;
         this.txManager = txManager;
+        this.localNode = localNode;
     }
 
     /**
@@ -61,7 +65,7 @@ public class TxRecoveryMessageHandler {
      * @param request Tx recovery request.
      * @return The future is complete when the transaction state is finalized.
      */
-    public CompletableFuture<Void> handle(TxRecoveryMessage request, UUID 
senderId) {
+    public CompletableFuture<?> handle(TxRecoveryMessage request, UUID 
senderId) {
         UUID txId = request.txId();
 
         TxMeta txMeta = txStatePartitionStorage.get(txId);
@@ -78,6 +82,12 @@ public class TxRecoveryMessageHandler {
                 txMeta
         );
 
-        return txRecoveryEngine.triggerTxRecovery(txId, senderId);
+        return txRecoveryEngine.triggerTxRecovery(
+                txId,
+                replicationGroupId,
+                localNode.name(),
+                null,
+                null
+        );
     }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
index f64c86af78e..4c9284aa457 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.RecipientLeftException;
-import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
@@ -38,6 +37,7 @@ import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.TxStateMetaFinishing;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
 import org.jetbrains.annotations.Nullable;
@@ -50,36 +50,41 @@ public class TxStateCommitPartitionReplicaRequestHandler {
     private final TxManager txManager;
     private final ClusterNodeResolver clusterNodeResolver;
 
-    private final InternalClusterNode localNode;
-
     private final TxRecoveryEngine txRecoveryEngine;
 
     private final TxMessageSender txMessageSender;
 
+    private final ZonePartitionId groupId;
+
+    private final InternalClusterNode localNode;
+
     /** Constructor. */
     public TxStateCommitPartitionReplicaRequestHandler(
             TxStatePartitionStorage txStatePartitionStorage,
             TxManager txManager,
             ClusterNodeResolver clusterNodeResolver,
-            InternalClusterNode localNode,
             TxRecoveryEngine txRecoveryEngine,
-            TxMessageSender txMessageSender
+            TxMessageSender txMessageSender,
+            ZonePartitionId groupId,
+            InternalClusterNode localNode
     ) {
         this.txStatePartitionStorage = txStatePartitionStorage;
         this.txManager = txManager;
         this.clusterNodeResolver = clusterNodeResolver;
-        this.localNode = localNode;
         this.txRecoveryEngine = txRecoveryEngine;
         this.txMessageSender = txMessageSender;
+        this.groupId = groupId;
+        this.localNode = localNode;
     }
 
     /**
      * Handles a {@link TxStateCommitPartitionRequest}.
      *
      * @param request Transaction state request.
+     * @param senderId Sender ephemeral id.
      * @return Result future.
      */
-    public CompletableFuture<TransactionMeta> 
handle(TxStateCommitPartitionRequest request) {
+    public CompletableFuture<TransactionMeta> 
handle(TxStateCommitPartitionRequest request, UUID senderId) {
         UUID txId = request.txId();
 
         TxStateMeta txMeta = txManager.stateMeta(txId);
@@ -98,7 +103,8 @@ public class TxStateCommitPartitionReplicaRequestHandler {
                     txMeta,
                     request.readTimestamp(),
                     request.senderCurrentConsistencyToken(),
-                    zonePartitionId
+                    zonePartitionId,
+                    senderId
             );
         } else {
             return completedFuture(txMeta);
@@ -121,7 +127,8 @@ public class TxStateCommitPartitionReplicaRequestHandler {
             @Nullable TxStateMeta txStateMeta,
             @Nullable HybridTimestamp readTimestamp,
             @Nullable Long senderCurrentConsistencyToken,
-            @Nullable ZonePartitionId senderGroupId
+            @Nullable ZonePartitionId senderGroupId,
+            UUID senderId
     ) {
         // The state is either null or PENDING or ABANDONED, other states have 
been filtered out previously.
         assert txStateMeta == null || txStateMeta.txState() == PENDING || 
txStateMeta.txState() == ABANDONED
@@ -143,10 +150,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
                 // state; and there is no final tx state in txStateStorage, or 
the tx coordinator left the cluster. But we can assume
                 // that as the coordinator (or information about it) is 
missing, there is no need to wait a finish request from
                 // tx coordinator, the transaction can't be committed at all.
-                return txRecoveryEngine.triggerTxRecovery(txId, localNode.id())
-                        .handle((v, ex) ->
-                                
CompletableFuture.<TransactionMeta>completedFuture(txManager.stateMeta(txId)))
-                        .thenCompose(Function.identity());
+                return txRecoveryEngine.triggerTxRecovery(txId, groupId, 
localNode.name(), senderGroupId, senderId);
             } else if (coordinator != null) {
                 // If there is coordinator in the cluster we should fallback 
to coordinator request. It's possible that coordinator
                 // was not seen in topology on another node which requested 
the state from commit partition, but can be seen here.
@@ -154,6 +158,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
 
                 return txMessageSender.resolveTxStateFromCoordinator(
                                 coordinator,
+                                groupId,
                                 txId,
                                 timestamp,
                                 senderCurrentConsistencyToken,
@@ -177,10 +182,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
                                     markAbandoned(txId);
                                 }
 
-                                return 
txRecoveryEngine.triggerTxRecovery(txId, localNode.id())
-                                        .handle((v, ex) ->
-                                                
CompletableFuture.<TransactionMeta>completedFuture(txManager.stateMeta(txId)))
-                                        .thenCompose(Function.identity());
+                                return 
txRecoveryEngine.triggerTxRecovery(txId, groupId, localNode.name(), 
senderGroupId, senderId);
                             }
                         })
                         .thenCompose(Function.identity());
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index b229f25c47c..ff9e319a599 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -295,6 +295,7 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
                 executorService,
                 scheduledExecutorService,
                 executorService,
+                Runnable::run,
                 clockService,
                 placementDriver,
                 schemaSyncService,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 765c25f61ac..d73fc0a2448 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -773,6 +773,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.rebalanceScheduler(),
                 threadPoolsManager.partitionOperationsExecutor(),
+                threadPoolsManager.commonScheduler(),
                 clockService,
                 placementDriverManager.placementDriver(),
                 schemaSyncService,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index bf1402cf11c..8cef74418ee 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1117,6 +1117,7 @@ public class IgniteImpl implements Ignite {
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.rebalanceScheduler(),
                 threadPoolsManager.partitionOperationsExecutor(),
+                threadPoolsManager.commonScheduler(),
                 clockService,
                 placementDriverMgr.placementDriver(),
                 schemaSyncService,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 62e3d004276..a13431c93f1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -464,11 +464,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
             // Saving state is not needed for full transactions.
             if (!req.full()) {
-                txManager.updateTxMeta(req.transactionId(), old -> 
builder(old, PENDING)
-                        .txCoordinatorId(req.coordinatorId())
-                        
.commitPartitionId(req.commitPartitionId().asZonePartitionId())
-                        .txLabel(req.txLabel())
-                        .build());
+                replicaTouch(req.transactionId(), req.coordinatorId(), 
req.commitPartitionId().asZonePartitionId(), req.txLabel());
             }
         }
 
@@ -486,6 +482,14 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         return completedFuture(mvDataStorage.estimatedSize());
     }
 
+    private void replicaTouch(UUID txId, UUID coordinatorId, ZonePartitionId 
commitPartitionId, @Nullable String txLabel) {
+        txManager.updateTxMeta(txId, old -> builder(old, PENDING)
+                .txCoordinatorId(coordinatorId)
+                .commitPartitionId(commitPartitionId)
+                .txLabel(txLabel)
+                .build());
+    }
+
     private static void setDelayedAckProcessor(@Nullable ReplicaResult result, 
@Nullable BiConsumer<Object, Throwable> proc) {
         if (result != null) {
             result.delayedAckProcessor = proc;
@@ -563,11 +567,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             // We treat SCAN as 2pc and only switch to a 1pc mode if all table 
rows fit in the bucket and the transaction is implicit.
             // See `req.full() && (err != null || rows.size() < 
req.batchSize())` condition.
             // If they don't fit the bucket, the transaction is treated as 2pc.
-            txManager.updateTxMeta(req.transactionId(), old -> builder(old, 
PENDING)
-                    .txCoordinatorId(req.coordinatorId())
-                    
.commitPartitionId(req.commitPartitionId().asZonePartitionId())
-                    .txLabel(req.txLabel())
-                    .build());
+            replicaTouch(req.transactionId(), req.coordinatorId(), 
req.commitPartitionId().asZonePartitionId(), req.txLabel());
 
             // Implicit RW scan can be committed locally on a last batch or 
error.
             return appendTxCommand(req.transactionId(), RW_SCAN, false, () -> 
processScanRetrieveBatchAction(req))
@@ -3497,7 +3497,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
                     if (isFinalState(transactionMeta.txState())) {
                         scheduleAsyncWriteIntentSwitch(txId, 
writeIntent.rowId(), transactionMeta);
-                    } else {
+                    } else if (timestamp == null) {
+                        // If it's resolution by RW txn.
                         LOG.info(
                                 "Received non-final transaction state after tx 
state resolution [txId={}, groupId={}, txMeta={}, "
                                     + "timestamp={}, commitPartId={}, 
currentConsistencyToken={}, writeIntentReadable={}].",
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index c5f6b362428..a1c60506c1b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -545,6 +545,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 ForkJoinPool.commonPool(),
                 mock(ScheduledExecutorService.class),
                 partitionOperationsExecutor,
+                ForkJoinPool.commonPool(),
                 clockService,
                 placementDriver,
                 schemaSyncService,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index d96a5d45554..f639261c707 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -239,6 +239,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
 import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
 import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
@@ -646,7 +647,10 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         messagingService,
                         mock(ReplicaService.class),
                         clockService
-                )
+                ),
+                new TxRecoveryEngine(txManager, 
mock(ClusterNodeResolver.class)),
+                new Lazy<>(() -> mock(InternalClusterNode.class)),
+                Runnable::run
         );
 
         transactionStateResolver.start();
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index ecc3139b0ce..f8b3cb96802 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -206,6 +206,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
 import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
 import org.apache.ignite.internal.tx.message.PartitionEnlistmentMessage;
 import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
@@ -614,7 +615,10 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         messagingService,
                         mock(ReplicaService.class),
                         clockService
-                )
+                ),
+                new TxRecoveryEngine(txManager, 
mock(ClusterNodeResolver.class)),
+                new Lazy<>(() -> mock(InternalClusterNode.class)),
+                Runnable::run
         );
 
         transactionStateResolver.start();
@@ -633,6 +637,11 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 clockService
         );
 
+        var txRecoveryEngine = new TxRecoveryEngine(
+                txManager,
+                topologySrv
+        );
+
         zonePartitionReplicaListener = new ZonePartitionReplicaListener(
                 txStateStorage,
                 clockService,
@@ -647,7 +656,8 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 localNode,
                 zonePartitionId,
                 transactionStateResolver,
-                txMessageSender
+                txMessageSender,
+                txRecoveryEngine
         );
 
         tableReplicaProcessor = new PartitionReplicaListener(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index f35084f8c62..816dd602018 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -179,6 +179,7 @@ import 
org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
 import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
@@ -733,13 +734,21 @@ public class ItTxTestCluster {
                                 clockServices.get(assignment)
                         );
 
+                TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(
+                        txManagers.get(assignment),
+                        clusterServices.get(assignment).topologyService()
+                );
+
                 var transactionStateResolver = new TransactionStateResolver(
                         txManagers.get(assignment),
                         clockServices.get(assignment),
                         nodeResolver,
                         clusterServices.get(assignment).messagingService(),
                         placementDriver,
-                        txMessageSender
+                        txMessageSender,
+                        txRecoveryEngine,
+                        new Lazy<>(() -> mock(InternalClusterNode.class)),
+                        Runnable::run
                 );
                 transactionStateResolver.start();
 
@@ -967,6 +976,11 @@ public class ItTxTestCluster {
                 clockService
         );
 
+        var txRecoveryEngine = new TxRecoveryEngine(
+                txManager,
+                clusterServices.get(assignment).topologyService()
+        );
+
         ZonePartitionReplicaListener zonePartitionReplicaListener = 
nodeSpecificZonePartitionReplicaListeners.computeIfAbsent(
                 zonePartitionId,
                 partitionId -> new ZonePartitionReplicaListener(
@@ -983,7 +997,8 @@ public class ItTxTestCluster {
                         localNode,
                         partitionId,
                         transactionStateResolver,
-                        txMessageSender
+                        txMessageSender,
+                        txRecoveryEngine
                 )
         );
 
@@ -1158,6 +1173,8 @@ public class ItTxTestCluster {
      * Shutdowns all cluster nodes after each test.
      */
     public void shutdownCluster() {
+        LOG.info("Cluster shutdown begin");
+
         assertThat(stopAsync(new ComponentContext(), cluster), 
willCompleteSuccessfully());
         assertThat(stopAsync(new ComponentContext(), client), 
willCompleteSuccessfully());
 
@@ -1233,6 +1250,8 @@ public class ItTxTestCluster {
         if (partitionOperationsExecutor != null) {
             
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10, 
TimeUnit.SECONDS);
         }
+
+        LOG.info("Cluster shutdown end");
     }
 
     /**
@@ -1327,7 +1346,10 @@ public class ItTxTestCluster {
                         client.messagingService(),
                         clientReplicaSvc,
                         clientClockService
-                )
+                ),
+                new TxRecoveryEngine(clientTxManager, 
client.topologyService()),
+                new Lazy<>(() -> mock(InternalClusterNode.class)),
+                Runnable::run
         );
 
         clientTxStateResolver.start();
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index a8eda99eb62..9b6ebdbbf2d 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.table;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -56,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Flow;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
@@ -554,13 +555,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
                 new TransactionOptions().timeoutMillis(1000)
         );
 
-        var err = assertThrows(CompletionException.class, fut0::join);
-
-        try {
-            assertInstanceOf(IllegalArgumentException.class, err.getCause());
-        } catch (AssertionError e) {
-            throw new AssertionError("Unexpected exception type", err);
-        }
+        assertThat(fut0, willThrow(IllegalArgumentException.class));
 
         assertEquals(balance, view.get(null, 
makeKey(1)).doubleValue("balance"));
     }
@@ -582,13 +577,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
                         new TransactionOptions().timeoutMillis(1000)
                 );
 
-        var err = assertThrows(CompletionException.class, fut0::join);
-
-        try {
-            assertInstanceOf(NullPointerException.class, err.getCause());
-        } catch (AssertionError e) {
-            throw new AssertionError("Unexpected exception type", err);
-        }
+        assertThat(fut0, willThrow(NullPointerException.class));
     }
 
     @Test
@@ -1921,7 +1910,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
 
         CompletableFuture<List<Tuple>> roBeforeCommitTxFut = 
scan(accounts.internalTable(), readOnlyTx);
 
-        var roBeforeCommitTxRows = roBeforeCommitTxFut.get(10, 
TimeUnit.SECONDS);
+        var roBeforeCommitTxRows = roBeforeCommitTxFut.get(10, SECONDS);
 
         assertEquals(2, roBeforeCommitTxRows.size());
 
@@ -1940,7 +1929,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         // Same read-only transaction.
         roBeforeCommitTxFut = scan(accounts.internalTable(), readOnlyTx);
 
-        roBeforeCommitTxRows = roBeforeCommitTxFut.get(10, TimeUnit.SECONDS);
+        roBeforeCommitTxRows = roBeforeCommitTxFut.get(10, SECONDS);
 
         assertEquals(2, roBeforeCommitTxRows.size());
 
@@ -1958,7 +1947,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
 
         CompletableFuture<List<Tuple>> roAfterCommitTxFut = 
scan(accounts.internalTable(), readOnlyTx2);
 
-        var roAfterCommitTxRows = roAfterCommitTxFut.get(10, TimeUnit.SECONDS);
+        var roAfterCommitTxRows = roAfterCommitTxFut.get(10, SECONDS);
 
         assertEquals(1, roAfterCommitTxRows.size());
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 8e4a2b512d6..dfe814ccef1 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -139,6 +139,7 @@ import 
org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
 import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
@@ -509,6 +510,11 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 CLOCK_SERVICE
         );
 
+        var txRecoveryEngine = new TxRecoveryEngine(
+                txManager,
+                mock(ClusterNodeResolver.class)
+        );
+
         ZonePartitionReplicaListener zoneReplicaListener = new 
ZonePartitionReplicaListener(
                 txStateStorage.getOrCreatePartitionStorage(PART_ID),
                 CLOCK_SERVICE,
@@ -523,7 +529,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 LOCAL_NODE,
                 zonePartitionId,
                 transactionStateResolver,
-                txMessageSender
+                txMessageSender,
+                txRecoveryEngine
         );
 
         zoneReplicaListener.addTableReplicaProcessor(tableId, (raftClient, 
txStateResolver) -> tableReplicaListener);
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest.java
index a05243ff932..48f3f569a4a 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -73,7 +74,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 /**
  * Test for transaction abort on coordinator when write-intent resolution 
happens after primary replica expiration.
@@ -119,8 +120,13 @@ public class 
ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testCoordinatorAbortsTransaction(boolean withThinClient) 
throws Exception {
+    @CsvSource({
+            "true, true",
+            "true, false",
+            "false, true",
+            "false, false"
+    })
+    public void testCoordinatorAbortsTransaction(boolean withThinClient, 
boolean limitedDataOnCoordinator) throws Exception {
         IgniteClient client = IgniteClient.builder()
                 
.addresses(getClientAddresses(runningNodes().collect(toList())).toArray(new 
String[0]))
                 .operationTimeout(15_000)
@@ -188,6 +194,11 @@ public class 
ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest
 
         waitAndGetPrimaryReplica(coordinatorNode, groupId);
 
+        if (limitedDataOnCoordinator) {
+            coordinatorNode.txManager()
+                    .updateTxMeta(clientTxId(tx0), old -> 
TxStateMeta.builder(PENDING).txCoordinatorId(coordinatorNode.id()).build());
+        }
+
         Transaction tx = coordinatorNode.transactions().begin();
         log.info("Test: new tx: " + txId(tx));
         log.info("Test: upsert");
@@ -335,4 +346,13 @@ public class 
ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest
                 .map(ignite -> unwrapIgniteImpl(ignite).clientAddress().port())
                 .collect(toList());
     }
+
+    private static UUID clientTxId(Transaction tx) {
+        if (tx instanceof ClientLazyTransaction) {
+            ClientLazyTransaction clientTx = (ClientLazyTransaction) tx;
+            return clientTx.startedTx().txId();
+        } else {
+            return txId(tx);
+        }
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 4d6ecf0f38a..578e60f04c9 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -135,12 +135,6 @@ public interface TxManager extends IgniteComponent {
      */
     @Nullable TxStateMeta stateMeta(UUID txId);
 
-    CompletableFuture<@Nullable TransactionMeta> 
checkEnlistedPartitionsAndAbortIfNeeded(
-            TxStateMeta txMeta,
-            InternalTransaction tx,
-            long currentEnlistmentConsistencyToken,
-            ZonePartitionId senderGroupId);
-
     /**
      * Atomically changes the state meta of a transaction.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index 6ca2351d075..a446ebef1ea 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -57,9 +57,10 @@ public enum TxState {
 
     /**
      * Unknown transaction state. It may be used during transaction state 
resolution, shouldn't be used for either volatile
-     * or persistent transaction meta. It means that the state of the sought 
transaction is unrecoverable from current storage state,
-     * and it's possible if there is no corresponding committed version, so 
the write intent that triggered this resolution
-     * should be ignored. Any transitions from or to this state are forbidden.
+     * or persistent transaction meta. It means that the state of the sought 
transaction is not found on current node
+     * or unrecoverable from current storage state, and it's possible if there 
is no corresponding committed version,
+     * so the write intent that triggered this resolution should be ignored.
+     * Any transitions from or to this state are forbidden.
      */
     UNKNOWN(5);
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index ddff0619475..be229a32c23 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -340,13 +340,31 @@ public class TxStateMeta implements TransactionMeta {
             return this;
         }
 
+        /**
+         * Changes transaction label, if non-{@code null} value is provided.
+         *
+         * @param txLabel Transaction label.
+         * @return This.
+         */
         public TxStateMetaBuilder txLabel(@Nullable String txLabel) {
-            this.txLabel = txLabel;
+            if (txLabel != null) {
+                this.txLabel = txLabel;
+            }
+
             return this;
         }
 
+        /**
+         * Changes internal transaction, if non-{@code null} value is provided.
+         *
+         * @param tx Internal transaction.
+         * @return This.
+         */
         public TxStateMetaBuilder tx(@Nullable InternalTransaction tx) {
-            this.tx = tx;
+            if (tx != null) {
+                this.tx = tx;
+            }
+
             return this;
         }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
index e1baa73159e..6c04d6e9a2f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
@@ -44,6 +44,7 @@ public class TxStateMetaUnknown extends TxStateMeta {
     public TxStateMetaMessage toTransactionMetaMessage(ReplicaMessagesFactory 
replicaMessagesFactory, TxMessagesFactory txMessagesFactory) {
         return txMessagesFactory
                 .txStateMetaUnknownMessage()
+                .txState(TxState.UNKNOWN)
                 .build();
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
index d9cc060254a..724912584a9 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
@@ -38,10 +38,13 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * A helper class to retrieve primary replicas with exception handling.
@@ -119,6 +122,17 @@ public class PlacementDriverHelper {
                 });
     }
 
+    /**
+     * See {@link 
LeasePlacementDriver#getCurrentPrimaryReplica(ReplicationGroupId, 
HybridTimestamp)}. Gets primary replica for now.
+     *
+     * @param partitionId Partition id.
+     * @return Primary replica for the provided partition, or null if there is 
no primary at the moment.
+     */
+    @Nullable
+    public ReplicaMeta getCurrentPrimaryReplica(ZonePartitionId partitionId) {
+        return placementDriver.getCurrentPrimaryReplica(partitionId, 
clockService.current());
+    }
+
     /**
      * Get primary replicas for the provided partitions.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 83934e5773a..4170318b2e1 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -47,7 +47,7 @@ import org.jetbrains.annotations.TestOnly;
 
 /**
  * Contains counters for in-flight requests of the transactions. Read-write 
transactions can't finish when some requests are in-flight.
- * Read-only transactions can't be included into {@link 
org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage} when
+ * Read-only transactions can't be included into {@link 
FinishedTransactionsBatchMessage} when
  * some requests are in-flight.
  */
 public class TransactionInflights {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
index 847d3b6d250..de483d3f9a8 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
@@ -18,21 +18,29 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.UNKNOWN;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static 
org.apache.ignite.internal.tx.impl.PlacementDriverHelper.AWAIT_PRIMARY_REPLICA_TIMEOUT;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.IgniteThrottledLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.network.InternalClusterNode;
@@ -40,10 +48,10 @@ import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.RecipientLeftException;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
@@ -54,6 +62,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
+import org.apache.ignite.internal.util.Lazy;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -62,6 +71,8 @@ import org.jetbrains.annotations.Nullable;
 public class TransactionStateResolver {
     private static final IgniteLogger LOG = 
Loggers.forClass(TransactionStateResolver.class);
 
+    private final IgniteThrottledLogger throttledLogger;
+
     /** Tx messages factory. */
     private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
 
@@ -88,6 +99,10 @@ public class TransactionStateResolver {
      */
     private final TxMessageSender txMessageSender;
 
+    private final TxRecoveryEngine txRecoveryEngine;
+
+    private final Lazy<InternalClusterNode> localNode;
+
     /**
      * The constructor.
      *
@@ -96,6 +111,10 @@ public class TransactionStateResolver {
      * @param clusterNodeResolver Cluster node resolver.
      * @param messagingService Messaging service.
      * @param placementDriver Placement driver.
+     * @param txMessageSender Tx message sender.
+     * @param txRecoveryEngine Transaction recovery engine.
+     * @param localNode Local cluster node.
+     * @param throttledLogExecutor Executor for cleaning up the throttled 
logger's cache.
      */
     public TransactionStateResolver(
             TxManager txManager,
@@ -103,7 +122,10 @@ public class TransactionStateResolver {
             ClusterNodeResolver clusterNodeResolver,
             MessagingService messagingService,
             PlacementDriver placementDriver,
-            TxMessageSender txMessageSender
+            TxMessageSender txMessageSender,
+            TxRecoveryEngine txRecoveryEngine,
+            Lazy<InternalClusterNode> localNode,
+            Executor throttledLogExecutor
     ) {
         this.txManager = txManager;
         this.clockService = clockService;
@@ -111,6 +133,9 @@ public class TransactionStateResolver {
         this.messagingService = messagingService;
         this.placementDriverHelper = new 
PlacementDriverHelper(placementDriver, clockService);
         this.txMessageSender = txMessageSender;
+        this.txRecoveryEngine = txRecoveryEngine;
+        this.localNode = localNode;
+        this.throttledLogger = Loggers.toThrottledLogger(LOG, 
throttledLogExecutor);
     }
 
     /**
@@ -121,8 +146,16 @@ public class TransactionStateResolver {
             if (msg instanceof TxStateCoordinatorRequest) {
                 TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) 
msg;
 
-                processTxStateRequest(req)
-                        .thenAccept(txStateMeta -> {
+                processTxStateRequest(req, sender)
+                        .whenComplete((txStateMeta, e) -> {
+                            if (e != null) {
+                                Throwable cause = unwrapCause(e);
+                                throttledLogger.info(cause.getMessage());
+
+                                // Will cause fallback to commit partition 
path.
+                                txStateMeta = 
TxStateMeta.builder(UNKNOWN).build();
+                            }
+
                             NetworkMessage response = 
TX_MESSAGES_FACTORY.txStateResponse()
                                     
.txStateMeta(toTransactionMetaMessage(txStateMeta))
                                     .timestamp(clockService.now())
@@ -304,9 +337,16 @@ public class TransactionStateResolver {
 
             resolveTxStateFromCommitPartition(txId, commitGrpId, 
senderCurrentConsistencyToken, senderGroupId, txMetaFuture);
         } else {
-            txMessageSender.resolveTxStateFromCoordinator(coordinator, txId, 
timestamp, senderCurrentConsistencyToken, senderGroupId)
+            txMessageSender.resolveTxStateFromCoordinator(
+                            coordinator,
+                            commitGrpId,
+                            txId,
+                            timestamp,
+                            senderCurrentConsistencyToken,
+                            senderGroupId
+                    )
                     .whenComplete((response, e) -> {
-                        if (e == null && response.txStateMeta() != null) {
+                        if (e == null && response.txStateMeta() != null && 
response.txStateMeta().txState() != UNKNOWN) {
                             
txMetaFuture.complete(response.txStateMeta().asTransactionMeta());
                         } else {
                             if (e != null && e.getCause() instanceof 
RecipientLeftException) {
@@ -440,9 +480,13 @@ public class TransactionStateResolver {
      * {@link TxState#FINISHING}, it waits for actual completion instead.
      *
      * @param request Request.
+     * @param sender Sender node.
      * @return Future that should be completed with transaction state meta.
      */
-    private CompletableFuture<@Nullable TransactionMeta> 
processTxStateRequest(TxStateCoordinatorRequest request) {
+    private CompletableFuture<@Nullable TransactionMeta> processTxStateRequest(
+            TxStateCoordinatorRequest request,
+            InternalClusterNode sender
+    ) {
         clockService.updateClock(request.readTimestamp());
 
         UUID txId = request.txId();
@@ -458,31 +502,58 @@ public class TransactionStateResolver {
                 TxStateMetaFinishing txStateMetaFinishing = 
(TxStateMetaFinishing) txStateMeta;
 
                 return txStateMetaFinishing.txFinishFuture();
-            } else {
-                InternalTransaction tx = txStateMeta.tx();
+            } else if (request.readTimestamp() == null || 
request.readTimestamp().equals(HybridTimestamp.MIN_VALUE)) {
+                // If txn is in non-final state and resolution is requested by 
RW txn.
                 Long currentConsistencyToken = 
request.senderCurrentConsistencyToken();
                 ZonePartitionId groupId = request.senderGroupId() == null
                         ? null
                         : request.senderGroupId().asZonePartitionId();
 
-                if (tx != null && !tx.isReadOnly() && currentConsistencyToken 
!= null && groupId != null) {
-                    return 
txManager.checkEnlistedPartitionsAndAbortIfNeeded(txStateMeta, tx, 
currentConsistencyToken, groupId);
+                if (currentConsistencyToken != null && groupId != null) {
+                    ZonePartitionId commitPartitionId = 
txStateMeta.commitPartitionId();
+
+                    if (commitPartitionId == null) {
+                        commitPartitionId = 
request.commitPartitionId().asZonePartitionId();
+
+                        if (commitPartitionId == null) {
+                            return failedFuture(new IgniteInternalException(
+                                    INTERNAL_ERR,
+                                    format("Commit partition id is absent in 
transaction meta "
+                                            + "on coordinator [txId={}, 
txMeta={}, req={}].", txId, txStateMeta)
+                            ));
+                        }
+                    }
+
+                    String commitPartitionNode = 
commitPartitionNode(commitPartitionId);
+
+                    return txRecoveryEngine.triggerTxRecovery(txId, 
commitPartitionId, commitPartitionNode, groupId, sender.id());
                 } else {
-                    LOG.info("Failed to abort on coordinator a transaction 
that lost its primary replica's volatile state "
-                            + "[txId={}, internalTx={}, readOnly={}, 
senderCurrentConsistencyToken={}, senderGroupId={}].",
-                            txId,
-                            tx,
-                            (tx == null ? null : tx.isReadOnly()),
-                            currentConsistencyToken,
-                            groupId
-                    );
+                    return failedFuture(new IgniteInternalException(
+                            INTERNAL_ERR,
+                            format("Failed to abort on coordinator a 
transaction that lost its primary replica's volatile state "
+                                    + "[txId={}, 
senderCurrentConsistencyToken={}, senderGroupId={}].",
+                                txId,
+                                currentConsistencyToken,
+                                groupId
+                            )
+                    ));
                 }
+            } else {
+                return completedFuture(txStateMeta);
             }
+        } else {
+            return failedFuture(
+                    new IgniteInternalException(INTERNAL_ERR, 
format("Transaction meta is absent on coordinator [txId={}].", txId))
+            );
         }
+    }
 
-        LOG.info("Transaction meta is absent on coordinator [txId={}].", txId);
+    private String commitPartitionNode(ZonePartitionId commitPartitionId) {
+        ReplicaMeta replicaMeta = 
placementDriverHelper.getCurrentPrimaryReplica(commitPartitionId);
 
-        return completedFuture(txStateMeta);
+        return replicaMeta == null
+                ? localNode.get().name() // Will be resolved correctly by tx 
cleanup sender.
+                : replicaMeta.getLeaseholder();
     }
 
     private static @Nullable TransactionMetaMessage 
toTransactionMetaMessage(@Nullable TransactionMeta transactionMeta) {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index a8e41b8e6d5..6256dd5d31f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -410,15 +410,20 @@ public class TxCleanupRequestSender {
                             if (partitions == null) {
                                 // If we don't have any partition, which is 
the recovery or "unlock only" case,
                                 // just try again with the same node.
-                                return sendCleanupMessageWithRetries(
-                                        commitPartitionId,
-                                        commit,
-                                        commitTimestamp,
-                                        txId,
-                                        node,
-                                        partitions,
-                                        incrementTimeout(timeout),
-                                        attemptsMade + 1
+                                return scheduleRetry(
+                                        () -> sendCleanupMessageWithRetries(
+                                                commitPartitionId,
+                                                commit,
+                                                commitTimestamp,
+                                                txId,
+                                                node,
+                                                partitions,
+                                                incrementTimeout(timeout),
+                                                attemptsMade + 1
+                                        ),
+                                        timeout,
+                                        TimeUnit.MILLISECONDS,
+                                        retryExecutor
                                 );
                             }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 0bd927e95cc..7007fb31085 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.tx.impl;
 
 import static java.lang.Math.toIntExact;
 import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -583,43 +582,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         return txStateVolatileStorage.state(txId);
     }
 
-    @Override
-    public CompletableFuture<@Nullable TransactionMeta> 
checkEnlistedPartitionsAndAbortIfNeeded(
-            TxStateMeta txMeta,
-            InternalTransaction tx,
-            long currentEnlistmentConsistencyToken,
-            ZonePartitionId senderGroupId
-    ) {
-        PendingTxPartitionEnlistment enlistment = 
tx.enlistedPartition(senderGroupId);
-
-        // Enlistment for thin client direct request may be absent on 
coordinator.
-        if (enlistment == null || enlistment.consistencyToken() != 
currentEnlistmentConsistencyToken) {
-            // Remote partition already has different consistency token, so we 
can't commit this transaction anyway.
-            // Even when graceful primary replica switch is done, we can get 
here only if the write intent that requires resolution
-            // is not under lock.
-            // TODO https://issues.apache.org/jira/browse/IGNITE-27386 the 
reason of rollback needs to be explained.
-            return tx.rollbackAsync()
-                    .thenApply(unused -> {
-                        TxStateMeta newMeta = stateMeta(tx.id());
-
-                        assert isFinalState(newMeta.txState());
-
-                        return newMeta;
-                    });
-        }
-
-        LOG.info("Skipped aborting on coordinator a transaction that lost its 
primary replica's volatile state "
-                        + "[txId={}, internalTx={}, enlistment={}, 
senderCurrentConsistencyToken={}, txMeta={}].",
-                tx.id(),
-                tx,
-                enlistment,
-                currentEnlistmentConsistencyToken,
-                txMeta
-        );
-
-        return completedFuture(txMeta);
-    }
-
     @TestOnly
     public Collection<TxStateMeta> states() {
         return txStateVolatileStorage.states();
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 12f4b93a6f9..b36af61a5b5 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -225,6 +225,7 @@ public class TxMessageSender {
      */
     public CompletableFuture<TxStateResponse> resolveTxStateFromCoordinator(
             InternalClusterNode coordinatorClusterNode,
+            ZonePartitionId commitGrpId,
             UUID txId,
             HybridTimestamp timestamp,
             @Nullable Long senderCurrentConsistencyToken,
@@ -237,6 +238,7 @@ public class TxMessageSender {
                                 .txId(txId)
                                 
.senderCurrentConsistencyToken(senderCurrentConsistencyToken)
                                 
.senderGroupId(toZonePartitionIdMessageNullable(REPLICA_MESSAGES_FACTORY, 
senderGroupId))
+                                
.commitPartitionId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
commitGrpId))
                                 .build(),
                         RPC_TIMEOUT_MILLIS)
                 .thenApply(resp -> {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
new file mode 100644
index 00000000000..a5f2ff0f362
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TransactionInternalException;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.TxStateMetaFinishing;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Transaction recovery logic.
+ */
+public class TxRecoveryEngine {
+    private final TxManager txManager;
+    private final ClusterNodeResolver clusterNodeResolver;
+
+    /** Constructor. */
+    public TxRecoveryEngine(
+            TxManager txManager,
+            ClusterNodeResolver clusterNodeResolver
+    ) {
+        this.txManager = txManager;
+        this.clusterNodeResolver = clusterNodeResolver;
+    }
+
+    /**
+     * Abort the abandoned transaction.
+     *
+     * @param txId Transaction id.
+     * @param commitPartitionId Commit partition id.
+     * @param commitPartitionNode Commit partition node consistent id.
+     * @param senderGroupId Sender group id.
+     * @param senderId Sender ephemeral id.
+     */
+    public CompletableFuture<TransactionMeta> triggerTxRecovery(
+            UUID txId,
+            ZonePartitionId commitPartitionId,
+            String commitPartitionNode,
+            @Nullable ZonePartitionId senderGroupId,
+            @Nullable UUID senderId
+    ) {
+        // Should not be empty.
+        Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroupsMap = 
enlistedGroupsMap(
+                commitPartitionId,
+                commitPartitionNode,
+                senderGroupId,
+                senderId == null ? null : clusterNodeResolver.getById(senderId)
+        );
+
+        // If the transaction state is pending, then the transaction should be 
rolled back,
+        // meaning that the state is changed to aborted and a corresponding 
cleanup request
+        // is sent in a common durable manner to a partition that has 
initiated recovery.
+        // TODO https://issues.apache.org/jira/browse/IGNITE-27386 the reason 
of rollback needs to be explained.
+        return txManager.finish(
+                        HybridTimestampTracker.emptyTracker(),
+                        // Tx recovery is executed on the commit partition.
+                        commitPartitionId,
+                        false,
+                        false,
+                        true,
+                        false,
+                        enlistedGroupsMap,
+                        txId
+                )
+                .handle((v, ex) -> {
+                    TransactionMeta txStateMeta = txManager.stateMeta(txId);
+                    CompletableFuture<TransactionMeta> res;
+
+                    if (txStateMeta != null) {
+                        if (isFinalState(txStateMeta.txState())) {
+                            res = completedFuture(txStateMeta);
+                        } else if (txStateMeta.txState() == FINISHING) {
+                            res = ((TxStateMetaFinishing) 
txStateMeta).txFinishFuture();
+                        } else {
+                            res = failedFuture(new 
TransactionInternalException(TX_ROLLBACK_ERR, format("Unexpected transaction "
+                                    + "state after recovery [txId={}, 
txMeta={}].", txId, txStateMeta)));
+                        }
+                    } else {
+                        if (ex == null) {
+                            res = 
completedFuture(TxStateMeta.builder(ABORTED).build());
+                        } else {
+                            sneakyThrow(ex);
+
+                            res = nullCompletedFuture();
+                        }
+                    }
+
+                    return res;
+                })
+                .thenCompose(Function.identity())
+                .whenComplete((v, ex) -> {
+                    runCleanupOnNode(commitPartitionId, txId, 
commitPartitionNode);
+
+                    if (senderGroupId != null && senderId != null) {
+                        runCleanupOnNode(senderGroupId, txId, senderId);
+                    }
+                });
+    }
+
+    private static Map<ZonePartitionId, PendingTxPartitionEnlistment> 
enlistedGroupsMap(
+            ZonePartitionId commitPartitionId,
+            String commitPartitionNode,
+            @Nullable ZonePartitionId senderGroupId,
+            @Nullable InternalClusterNode senderNode
+    ) {
+        if (senderGroupId == null || senderNode == null || 
commitPartitionId.equals(senderGroupId)) {
+            return Map.of(commitPartitionId, 
createAbandonedTxRecoveryEnlistment(commitPartitionNode));
+        } else {
+            return Map.of(
+                    commitPartitionId, 
createAbandonedTxRecoveryEnlistment(commitPartitionNode),
+                    senderGroupId, 
createAbandonedTxRecoveryEnlistment(senderNode.name())
+            );
+        }
+    }
+
+    private static PendingTxPartitionEnlistment 
createAbandonedTxRecoveryEnlistment(String nodeName) {
+        // Enlistment consistency token is not required for the rollback, so 
it is 0L.
+        // Passing an empty set of table IDs as we don't know which tables 
were enlisted; this is ok as the corresponding write intents
+        // can still be resolved later when reads stumble upon them.
+        return new PendingTxPartitionEnlistment(nodeName, 0L);
+    }
+
+    /**
+     * Run cleanup on a node.
+     *
+     * @param groupId Group id.
+     * @param txId Transaction id.
+     * @param nodeId Node id (inconsistent).
+     */
+    public CompletableFuture<Void> runCleanupOnNode(ZonePartitionId groupId, 
UUID txId, UUID nodeId) {
+        // Get node id of the sender to send back cleanup requests.
+        String nodeConsistentId = 
clusterNodeResolver.getConsistentIdById(nodeId);
+
+        return nodeConsistentId == null ? nullCompletedFuture() : 
runCleanupOnNode(groupId, txId, nodeConsistentId);
+    }
+
+    /**
+     * Run cleanup on a node.
+     *
+     * @param commitPartitionId Commit partition id.
+     * @param txId Transaction id.
+     * @param nodeName Node consistent id.
+     */
+    private CompletableFuture<Void> runCleanupOnNode(ZonePartitionId 
commitPartitionId, UUID txId, String nodeName) {
+        return txManager.cleanup(commitPartitionId, nodeName, txId);
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
index b13589a4c29..bba902bed2d 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
@@ -48,4 +48,11 @@ public interface TxStateCoordinatorRequest extends 
NetworkMessage {
      * @return Group id of the sender partition, or {@code null} if the 
request is not caused by write intent resolution.
      */
     @Nullable ZonePartitionIdMessage senderGroupId();
+
+    /**
+     * It's needed to be able to abort transaction in case when commit 
partition id is lost on coordinator.
+     *
+     * @return Commit group id.
+     */
+    @Nullable ZonePartitionIdMessage commitPartitionId();
 }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionStateResolverTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionStateResolverTest.java
index 6cdbd9324d3..f1632b1241a 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionStateResolverTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionStateResolverTest.java
@@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -56,6 +57,7 @@ import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxStateResponse;
+import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -103,7 +105,10 @@ public class TransactionStateResolverTest extends 
BaseIgniteAbstractTest {
                 clusterNodeResolver,
                 messagingService,
                 placementDriver,
-                txMessageSender
+                txMessageSender,
+                new TxRecoveryEngine(txManager, 
mock(ClusterNodeResolver.class)),
+                new Lazy<>(() -> mock(InternalClusterNode.class)),
+                Runnable::run
         );
 
         // Setup default mock for PlacementDriver to avoid timeouts.
@@ -135,6 +140,7 @@ public class TransactionStateResolverTest extends 
BaseIgniteAbstractTest {
 
         when(txMessageSender.resolveTxStateFromCoordinator(
                     any(InternalClusterNode.class),
+                    eq(commitPartitionId),
                     eq(txId),
                     any(HybridTimestamp.class),
                     any(Long.class),
@@ -167,6 +173,7 @@ public class TransactionStateResolverTest extends 
BaseIgniteAbstractTest {
         ArgumentCaptor<InternalClusterNode> nodeCaptor = 
ArgumentCaptor.forClass(InternalClusterNode.class);
         verify(txMessageSender).resolveTxStateFromCoordinator(
                 nodeCaptor.capture(),
+                eq(commitPartitionId),
                 eq(txId),
                 any(HybridTimestamp.class),
                 any(Long.class),
@@ -201,6 +208,7 @@ public class TransactionStateResolverTest extends 
BaseIgniteAbstractTest {
 
         when(txMessageSender.resolveTxStateFromCoordinator(
                     any(InternalClusterNode.class),
+                    eq(commitPartitionId),
                     eq(txId),
                     any(HybridTimestamp.class),
                     any(Long.class),
@@ -271,6 +279,7 @@ public class TransactionStateResolverTest extends 
BaseIgniteAbstractTest {
 
         when(txMessageSender.resolveTxStateFromCoordinator(
                     any(InternalClusterNode.class),
+                    eq(commitPartitionId),
                     eq(txId),
                     any(HybridTimestamp.class),
                     any(Long.class),
@@ -292,6 +301,7 @@ public class TransactionStateResolverTest extends 
BaseIgniteAbstractTest {
         ArgumentCaptor<InternalClusterNode> nodeCaptor = 
ArgumentCaptor.forClass(InternalClusterNode.class);
         verify(txMessageSender).resolveTxStateFromCoordinator(
                 nodeCaptor.capture(),
+                eq(commitPartitionId),
                 eq(txId),
                 any(HybridTimestamp.class),
                 any(Long.class),


Reply via email to