This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6acdc63bde IGNITE-20034 Implemented writeIntentResolution coordinator 
path (#2585)
6acdc63bde is described below

commit 6acdc63bde9ba55d0b20c17cfc7c6d9ab995bf73
Author: Denis Chudov <moongll...@gmail.com>
AuthorDate: Mon Sep 18 20:24:39 2023 +0400

    IGNITE-20034 Implemented writeIntentResolution coordinator path (#2585)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../org/apache/ignite/network/TopologyService.java |   8 +
 .../internal/network/file/TestTopologyService.java |   5 +
 .../scalecube/ScaleCubeTopologyService.java        |   6 +
 .../engine/framework/ClusterServiceFactory.java    |   5 +
 .../distributed/ItTxDistributedTestSingleNode.java |   6 +
 .../ignite/distributed/ItTxStateLocalMapTest.java  |   6 +-
 .../internal/table/distributed/TableManager.java   |  11 +-
 .../distributed/replicator/LeaderOrTxState.java    |  14 +-
 .../replicator/PartitionReplicaListener.java       | 118 +++------
 .../replicator/TransactionStateResolver.java       | 268 ++++++++++++++++++++-
 .../apache/ignite/internal/table/TxLocalTest.java  |  52 +---
 .../replication/PartitionReplicaListenerTest.java  |  55 +++--
 .../apache/ignite/distributed/ItTxTestCluster.java |  96 ++++++--
 .../ignite/internal/table/TxAbstractTest.java      |  38 ++-
 .../internal/tx/TransactionAbandonedException.java |  54 +++++
 ...ateReplicaRequest.java => TransactionMeta.java} |  24 +-
 .../java/org/apache/ignite/internal/tx/TxMeta.java |   5 +-
 .../org/apache/ignite/internal/tx/TxState.java     |  18 +-
 .../org/apache/ignite/internal/tx/TxStateMeta.java |  12 +-
 .../ignite/internal/tx/TxStateMetaFinishing.java   |  56 +++++
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  54 ++++-
 .../ignite/internal/tx/message/TxMessageGroup.java |  14 +-
 ...est.java => TxStateCommitPartitionRequest.java} |  13 +-
 ...Request.java => TxStateCoordinatorRequest.java} |   6 +-
 ...ateReplicaRequest.java => TxStateResponse.java} |  25 +-
 26 files changed, 713 insertions(+), 259 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 97ce2554e5..bf28673f7e 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -332,6 +332,9 @@ public class ErrorGroups {
 
         /** Failure due to an incompatible schema change. */
         public static final int TX_INCOMPATIBLE_SCHEMA_ERR = 
TX_ERR_GROUP.registerErrorCode((short) 12);
+
+        /** Failure due to an abandoned transaction. */
+        public static final int TX_ABANDONED_ERR = 
TX_ERR_GROUP.registerErrorCode((short) 13);
     }
 
     /** Replicator error group. */
diff --git 
a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java 
b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
index da1c3f7ca3..13b647e92d 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -61,4 +61,12 @@ public interface TopologyService {
      * @return The node object; {@code null} if the node has not been 
discovered or is offline.
      */
     @Nullable ClusterNode getByConsistentId(String consistentId);
+
+    /**
+     * Returns a cluster node specified by its ID.
+     *
+     * @param id Node ID.
+     * @return The node object; {@code null} if the node has not been 
discovered or is offline.
+     */
+    @Nullable ClusterNode getById(String id);
 }
diff --git 
a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
 
b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
index a98a849a7e..07e4b95ff7 100644
--- 
a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
+++ 
b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
@@ -48,6 +48,11 @@ public class TestTopologyService extends 
AbstractTopologyService {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public @Nullable ClusterNode getById(String id) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Calls {@link TopologyEventHandler#onAppeared(ClusterNode)} on all 
registered event handlers.
      *
diff --git 
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
 
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 8d11ac34d2..9c23f993c5 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -182,6 +182,12 @@ final class ScaleCubeTopologyService extends 
AbstractTopologyService {
         return consistentIdToMemberMap.get(consistentId);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable ClusterNode getById(String id) {
+        return consistentIdToMemberMap.values().stream().filter(member -> 
member.id().equals(id)).findFirst().orElse(null);
+    }
+
     /**
      * Converts the given {@link Member} to a {@link ClusterNode}.
      *
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index 7cd29156c7..9e04a26b0c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -160,6 +160,11 @@ public class ClusterServiceFactory {
         public @Nullable ClusterNode getByConsistentId(String consistentId) {
             return allMembers.get(consistentId);
         }
+
+        @Override
+        public @Nullable ClusterNode getById(String id) {
+            return allMembers.values().stream().filter(member -> 
member.id().equals(id)).findFirst().orElse(null);
+        }
     }
 
     private class LocalMessagingService extends AbstractMessagingService {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 05afbc7918..557867356e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Collection;
 import java.util.Map;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -224,6 +225,11 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
         return true;
     }
 
+    @Override
+    protected Collection<TxManager> txManagers() {
+        return txTestCluster.txManagers.values();
+    }
+
     @Test
     public void testIgniteTransactionsAndReadTimestamp() {
         Transaction readWriteTx = igniteTransactions.begin();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
index 702401d28e..3d8591000f 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
@@ -151,7 +151,11 @@ public class ItTxStateLocalMapTest extends 
IgniteAbstractTest {
 
         checkLocalTxStateOnNodes(
                 tx.id(),
-                new TxStateMeta(commit ? COMMITED : ABORTED, coordinatorId, 
commit ? testCluster.clocks.get(coord.name()).now() : null)
+                new TxStateMeta(
+                        commit ? COMMITED : ABORTED,
+                        coordinatorId,
+                        commit ? testCluster.clocks.get(coord.name()).now() : 
null
+                )
         );
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index aee43081ab..8635c3cd73 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -432,7 +432,14 @@ public class TableManager extends 
AbstractEventProducer<TableEvent, TableEventPa
 
         clusterNodeResolver = topologyService::getByConsistentId;
 
-        transactionStateResolver = new TransactionStateResolver(replicaSvc, 
clusterNodeResolver);
+        transactionStateResolver = new TransactionStateResolver(
+                replicaSvc,
+                txManager,
+                clock,
+                clusterNodeResolver,
+                topologyService::getById,
+                clusterService.messagingService()
+        );
 
         tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new);
 
@@ -495,6 +502,8 @@ public class TableManager extends 
AbstractEventProducer<TableEvent, TableEventPa
 
             lowWatermark.start();
 
+            transactionStateResolver.start();
+
             CompletableFuture<Long> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
 
             assert recoveryFinishFuture.isDone();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
index a37a7b512f..b838156230 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/LeaderOrTxState.java
@@ -18,13 +18,13 @@
 package org.apache.ignite.internal.table.distributed.replicator;
 
 import java.io.Serializable;
-import org.apache.ignite.internal.tx.TxMeta;
-import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Response for the {@link TxStateReplicaRequest}. Can contain either the 
consistent ID of the Partition Group leader, which should be
- * queried for the TX Meta, or the TX Meta itself.
+ * Response for the {@link TxStateCommitPartitionRequest}. Can contain either 
the consistent ID of the Partition Group leader, which should
+ * be queried for the TX Meta, or the TX Meta itself.
  */
 public class LeaderOrTxState implements Serializable {
     private static final long serialVersionUID = -3555591755828355117L;
@@ -33,7 +33,7 @@ public class LeaderOrTxState implements Serializable {
     private final String leaderName;
 
     @Nullable
-    private final TxMeta txMeta;
+    private final TransactionMeta txMeta;
 
     /**
      * Creates a response.
@@ -41,7 +41,7 @@ public class LeaderOrTxState implements Serializable {
      * @param leaderName Leader consistent ID.
      * @param txMeta TX meta.
      */
-    public LeaderOrTxState(@Nullable String leaderName, @Nullable TxMeta 
txMeta) {
+    public LeaderOrTxState(@Nullable String leaderName, @Nullable 
TransactionMeta txMeta) {
         this.leaderName = leaderName;
         this.txMeta = txMeta;
     }
@@ -50,7 +50,7 @@ public class LeaderOrTxState implements Serializable {
         return leaderName;
     }
 
-    public @Nullable TxMeta txMeta() {
+    public @Nullable TransactionMeta txMeta() {
         return txMeta;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 42e3e3d4c7..0197aa61cf 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -25,6 +25,7 @@ import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DROP;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITED;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
@@ -132,14 +133,14 @@ import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockKey;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.LockMode;
+import org.apache.ignite.internal.tx.TransactionAbandonedException;
+import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
-import org.apache.ignite.internal.tx.message.TxMessagesFactory;
-import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.CursorUtils;
@@ -168,9 +169,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Factory for creating replica command messages. */
     private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
-    /** Tx messages factory. */
-    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
-
     /** Replication group id. */
     private final TablePartitionId replicationGroupId;
 
@@ -216,12 +214,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Runs async scan tasks for effective tail recursion execution (avoid 
deep recursive calls). */
     private final Executor scanRequestExecutor;
 
-    /**
-     * Map to control clock's update in the read only transactions 
concurrently with a commit timestamp.
-     * TODO: IGNITE-20034 review this after the commit timestamp will be 
provided from a commit request (request.commitTimestamp()).
-     */
-    private final ConcurrentHashMap<UUID, CompletableFuture<TxMeta>> 
txTimestampUpdateMap = new ConcurrentHashMap<>();
-
     private final Supplier<Map<Integer, IndexLocker>> indexesLockers;
 
     private final ConcurrentMap<UUID, TxCleanupReadyFutureList> 
txCleanupReadyFutures = new ConcurrentHashMap<>();
@@ -336,8 +328,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     @Override
     public CompletableFuture<?> invoke(ReplicaRequest request, String 
senderId) {
-        if (request instanceof TxStateReplicaRequest) {
-            return processTxStateReplicaRequest((TxStateReplicaRequest) 
request);
+        if (request instanceof TxStateCommitPartitionRequest) {
+            return 
processTxStateCommitPartitionRequest((TxStateCommitPartitionRequest) request);
         }
 
         return ensureReplicaIsPrimary(request).thenCompose(isPrimary -> 
processRequest(request, isPrimary, senderId));
@@ -424,49 +416,23 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param request Transaction state request.
      * @return Result future.
      */
-    private CompletableFuture<LeaderOrTxState> 
processTxStateReplicaRequest(TxStateReplicaRequest request) {
+    private CompletableFuture<LeaderOrTxState> 
processTxStateCommitPartitionRequest(TxStateCommitPartitionRequest request) {
         return placementDriver.getPrimaryReplica(replicationGroupId, 
hybridClock.now())
                 .thenCompose(primaryReplica -> {
                     if (isLocalPeer(primaryReplica.getLeaseholder())) {
-                        CompletableFuture<TxMeta> txStateFut = 
getTxStateConcurrently(request);
+                        TransactionMeta txMeta = 
txManager.stateMeta(request.txId());
+
+                        if (txMeta == null) {
+                            txMeta = txStateStorage.get(request.txId());
+                        }
 
-                        return txStateFut.thenApply(txMeta -> new 
LeaderOrTxState(null, txMeta));
+                        return completedFuture(new LeaderOrTxState(null, 
txMeta));
                     } else {
                         return completedFuture(new 
LeaderOrTxState(primaryReplica.getLeaseholder(), null));
                     }
                 });
     }
 
-    /**
-     * Gets a transaction state or {@code null}, if the transaction is not 
completed.
-     *
-     * @param txStateReq Transaction state request.
-     * @return Future to transaction state meta or {@code null}.
-     */
-    private CompletableFuture<TxMeta> 
getTxStateConcurrently(TxStateReplicaRequest txStateReq) {
-        //TODO: IGNITE-20034 review this after the commit timestamp will be 
provided from a commit request (request.commitTimestamp()).
-        CompletableFuture<TxMeta> txStateFut = new CompletableFuture<>();
-
-        txTimestampUpdateMap.compute(txStateReq.txId(), (uuid, fut) -> {
-            if (fut != null) {
-                fut.thenAccept(txStateFut::complete);
-            } else {
-                TxMeta txMeta = txStateStorage.get(txStateReq.txId());
-
-                if (txMeta == null) {
-                    // All future transactions will be committed after the 
resolution processed.
-                    hybridClock.update(txStateReq.readTimestamp());
-                }
-
-                txStateFut.complete(txMeta);
-            }
-
-            return null;
-        });
-
-        return txStateFut;
-    }
-
     /**
      * Processes retrieve batch for read only transaction.
      *
@@ -1197,9 +1163,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             UUID txId,
             String txCoordinatorId
     ) {
-        CompletableFuture<?> changeStateFuture = 
finishTransaction(aggregatedGroupIds, txId, commit, txCoordinatorId);
+        HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+        CompletableFuture<?> changeStateFuture = 
finishTransaction(aggregatedGroupIds, txId, commit, commitTimestamp, 
txCoordinatorId);
 
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17578 Cleanup 
process should be asynchronous.
         CompletableFuture<?>[] cleanupFutures = new 
CompletableFuture[request.groups().size()];
         AtomicInteger cleanupFuturesCnt = new AtomicInteger(0);
 
@@ -1225,6 +1192,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param aggregatedGroupIds Partition identifies which are enlisted in 
the transaction.
      * @param txId Transaction id.
      * @param commit True is the transaction is committed, false otherwise.
+     * @param commitTimestamp Commit timestamp, if applicable.
      * @param txCoordinatorId Transaction coordinator id.
      * @return Future to wait of the finish.
      */
@@ -1232,15 +1200,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             List<TablePartitionId> aggregatedGroupIds,
             UUID txId,
             boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
             String txCoordinatorId
     ) {
-        // TODO: IGNITE-20034 Timestamp from request is not using until the 
issue has not been fixed (request.commitTimestamp())
-        var fut = new CompletableFuture<TxMeta>();
-
-        txTimestampUpdateMap.put(txId, fut);
-
         HybridTimestamp currentTimestamp = hybridClock.now();
-        HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
 
         return reliableCatalogVersionFor(currentTimestamp)
                 .thenApply(catalogVersion -> {
@@ -1266,11 +1229,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .whenComplete((o, throwable) -> {
                     TxState txState = commit ? COMMITED : ABORTED;
 
-                    fut.complete(new TxMeta(txState, aggregatedGroupIds, 
commitTimestamp));
-
                     markFinished(txId, txState, commitTimestamp);
-
-                    txTimestampUpdateMap.remove(txId);
                 });
     }
 
@@ -1537,8 +1496,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 checkWriteIntentsBelongSameTx(writeIntents);
 
                 return resolveTxState(
-                        new TablePartitionId(writeIntent.commitTableId(), 
writeIntent.commitPartitionId()),
                         writeIntent.transactionId(),
+                        new TablePartitionId(writeIntent.commitTableId(), 
writeIntent.commitPartitionId()),
                         ts)
                         .thenApply(readLastCommitted -> {
                             if (readLastCommitted) {
@@ -2508,8 +2467,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             Supplier<BinaryRow> lastCommitted
     ) {
         return resolveTxState(
-                new TablePartitionId(readResult.commitTableId(), 
readResult.commitPartitionId()),
                 readResult.transactionId(),
+                new TablePartitionId(readResult.commitTableId(), 
readResult.commitPartitionId()),
                 timestamp
         ).thenApply(readLastCommitted -> {
             if (readLastCommitted) {
@@ -2523,34 +2482,33 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /**
      * Resolve the actual tx state.
      *
-     * @param commitGrpId Commit partition id.
      * @param txId Transaction id.
+     * @param commitGrpId Commit partition id.
      * @param timestamp Timestamp.
      * @return The future completes with true when the transaction is not 
completed yet and false otherwise.
      */
     private CompletableFuture<Boolean> resolveTxState(
-            TablePartitionId commitGrpId,
             UUID txId,
+            TablePartitionId commitGrpId,
             HybridTimestamp timestamp
     ) {
-        boolean readLatest = timestamp == null;
-
-        return transactionStateResolver.sendMetaRequest(commitGrpId, 
FACTORY.txStateReplicaRequest()
-                        .groupId(commitGrpId)
-                        .readTimestampLong((readLatest ? 
HybridTimestamp.MIN_VALUE : timestamp).longValue())
-                        .txId(txId)
-                        .build())
-                .thenApply(txMeta -> {
-                    if (txMeta == null) {
-                        return true;
-                    } else if (txMeta.txState() == COMMITED) {
-                        return !readLatest && 
txMeta.commitTimestamp().compareTo(timestamp) > 0;
-                    } else {
-                        assert txMeta.txState() == ABORTED : "Unexpected 
transaction state [state=" + txMeta.txState() + ']';
+        return transactionStateResolver.resolveTxState(txId, commitGrpId, 
timestamp).thenApply(txMeta -> {
+            if (txMeta == null || txMeta.txState() == ABANDONED) {
+                // TODO https://issues.apache.org/jira/browse/IGNITE-20427 
make the null value returned from commit partition
+                // TODO more determined
+                throw new TransactionAbandonedException(txId, txMeta);
+            } else if (txMeta.txState() == COMMITED) {
+                boolean readLatest = timestamp == null;
+
+                return !readLatest && 
txMeta.commitTimestamp().compareTo(timestamp) > 0;
+            } else if (txMeta.txState() == ABORTED) {
+                return true;
+            } else {
+                assert txMeta.txState() == PENDING : "Unexpected transaction 
state [state=" + txMeta.txState() + ']';
 
-                        return true;
-                    }
-                });
+                return true;
+            }
+        });
     }
 
     private CompletableFuture<Void> validateAtTimestamp(UUID txId) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index 288be7b281..2b95a7e3fa 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -17,24 +17,49 @@
 
 package org.apache.ignite.internal.table.distributed.replicator;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.tx.TxState.ABANDONED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
+import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.tx.TxMeta;
-import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.TxStateMetaFinishing;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
+import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
+import org.apache.ignite.internal.tx.message.TxStateResponse;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Helper class that allows to resolve transaction state.
+ * Placement driver.
  */
 public class TransactionStateResolver {
+    /** Tx messages factory. */
+    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
+    /** Network timeout. */
+    private static final long RPC_TIMEOUT = 3000;
+
     /** Assignment node names per replication group. */
     private final Map<ReplicationGroupId, LinkedHashSet<String>> 
primaryReplicaMapping = new ConcurrentHashMap<>();
 
@@ -44,29 +69,200 @@ public class TransactionStateResolver {
     /** Function that resolves a node consistent ID to a cluster node. */
     private final Function<String, ClusterNode> clusterNodeResolver;
 
+    // TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this 
ticket this resolver will be no longer needed, as
+    // TODO we will store coordinator as ClusterNode in local tx state map.
+    /** Function that resolves a node non-consistent ID to a cluster node. */
+    private final Function<String, ClusterNode> clusterNodeResolverById;
+
+    private final Map<UUID, CompletableFuture<TransactionMeta>> txStateFutures 
= new ConcurrentHashMap<>();
+
+    private final TxManager txManager;
+
+    private final HybridClock clock;
+
+    private final MessagingService messagingService;
+
     /**
      * The constructor.
      *
      * @param replicaService Replication service.
+     * @param txManager Transaction manager.
+     * @param clock Node clock.
+     * @param clusterNodeResolver Cluster node resolver.
+     * @param clusterNodeResolverById Cluster node resolver using 
non-consistent id.
+     * @param messagingService Messaging service.
      */
-    public TransactionStateResolver(ReplicaService replicaService, 
Function<String, ClusterNode> clusterNodeResolver) {
+    public TransactionStateResolver(
+            ReplicaService replicaService,
+            TxManager txManager,
+            HybridClock clock,
+            Function<String, ClusterNode> clusterNodeResolver,
+            Function<String, ClusterNode> clusterNodeResolverById,
+            MessagingService messagingService
+    ) {
         this.replicaService = replicaService;
+        this.txManager = txManager;
+        this.clock = clock;
         this.clusterNodeResolver = clusterNodeResolver;
+        this.clusterNodeResolverById = clusterNodeResolverById;
+        this.messagingService = messagingService;
+    }
+
+    /**
+     * This should be called in order to allow the transaction state resolver 
to listen to {@link TxStateCoordinatorRequest} messages.
+     */
+    public void start() {
+        messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, 
correlationId) -> {
+            if (msg instanceof TxStateCoordinatorRequest) {
+                TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) 
msg;
+
+                processTxStateRequest(req)
+                        .thenAccept(txStateMeta -> {
+                            NetworkMessage response = FACTORY.txStateResponse()
+                                    .txStateMeta(txStateMeta)
+                                    .timestampLong(clock.nowLong())
+                                    .build();
+
+                            messagingService.respond(sender, response, 
correlationId);
+                        });
+            }
+        });
     }
 
     /**
-     * Sends a transaction state request to the primary replica.
+     * Resolves transaction state locally, if possible, or distributively, if 
needed.
      *
-     * @param replicaGrp Replication group id.
-     * @param request Status request.
-     * @return Result future.
+     * @param txId Transaction id.
+     * @param commitGrpId Commit partition group id.
+     * @param timestamp Timestamp.
+     * @return Future with the transaction state meta as a result.
      */
-    public CompletableFuture<TxMeta> sendMetaRequest(ReplicationGroupId 
replicaGrp, TxStateReplicaRequest request) {
-        CompletableFuture<TxMeta> resFut = new CompletableFuture<>();
+    public CompletableFuture<TransactionMeta> resolveTxState(
+            UUID txId,
+            ReplicationGroupId commitGrpId,
+            HybridTimestamp timestamp
+    ) {
+        TxStateMeta localMeta = txManager.stateMeta(txId);
+
+        if (localMeta != null && isFinalState(localMeta.txState())) {
+            return completedFuture(localMeta);
+        }
 
-        sendAndRetry(resFut, replicaGrp, request);
+        CompletableFuture<TransactionMeta> future = 
txStateFutures.compute(txId, (k, v) -> {
+            if (v == null) {
+                v = new CompletableFuture<>();
 
-        return resFut;
+                resolveDistributiveTxState(txId, localMeta, commitGrpId, 
timestamp, v);
+            }
+
+            return v;
+        });
+
+        future.whenComplete((v, e) -> txStateFutures.remove(txId));
+
+        return future;
+    }
+
+    /**
+     * Resolve the transaction state distributively. This method doesn't 
process final tx states.
+     *
+     * @param txId Transaction id.
+     * @param localMeta Local tx meta.
+     * @param commitGrpId Commit partition group id.
+     * @param timestamp Timestamp to pass to target node.
+     * @param txMetaFuture Tx meta future to complete with the result.
+     */
+    private void resolveDistributiveTxState(
+            UUID txId,
+            @Nullable TxStateMeta localMeta,
+            ReplicationGroupId commitGrpId,
+            HybridTimestamp timestamp,
+            CompletableFuture<TransactionMeta> txMetaFuture
+    ) {
+        assert localMeta == null || !isFinalState(localMeta.txState()) : 
"Unexpected tx meta [txId" + txId + ", meta=" + localMeta + ']';
+
+        HybridTimestamp timestamp0 = timestamp == null ? 
HybridTimestamp.MIN_VALUE : timestamp;
+
+        if (localMeta == null) {
+            // Fallback to commit partition path, because we don't have 
coordinator id.
+            resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture);
+        } else if (localMeta.txState() == PENDING) {
+            resolveTxStateFromTxCoordinator(txId, localMeta.txCoordinatorId(), 
commitGrpId, timestamp0, txMetaFuture);
+        } else if (localMeta.txState() == FINISHING) {
+            assert localMeta instanceof TxStateMetaFinishing;
+
+            ((TxStateMetaFinishing) 
localMeta).txFinishFuture().whenComplete((v, e) -> {
+                if (e == null) {
+                    txMetaFuture.complete(v);
+                } else {
+                    txMetaFuture.completeExceptionally(e);
+                }
+            });
+        } else {
+            assert localMeta.txState() == ABANDONED : "Unexpected transaction 
state [txId=" + txId + ", txStateMeta=" + localMeta + ']';
+
+            txMetaFuture.complete(localMeta);
+        }
+    }
+
+    private void resolveTxStateFromTxCoordinator(
+            UUID txId,
+            String coordinatorId,
+            ReplicationGroupId commitGrpId,
+            HybridTimestamp timestamp,
+            CompletableFuture<TransactionMeta> txMetaFuture
+    ) {
+        updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
+
+        ClusterNode coordinator = clusterNodeResolverById.apply(coordinatorId);
+
+        if (coordinator == null) {
+            // This means the coordinator node have either left the cluster or 
restarted.
+            resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture);
+        } else {
+            CompletableFuture<TransactionMeta> coordinatorTxMetaFuture = new 
CompletableFuture<>();
+
+            coordinatorTxMetaFuture.whenComplete((v, e) -> {
+                assert v != null : "Unexpected result from transaction 
coordinator: unknown transaction state [txId=" + txId
+                        + ", transactionCoordinator=" + coordinator + ']';
+
+                if (e == null) {
+                    txMetaFuture.complete(v);
+                } else {
+                    txMetaFuture.completeExceptionally(e);
+                }
+            });
+
+            TxStateCoordinatorRequest request = 
FACTORY.txStateCoordinatorRequest()
+                    .readTimestampLong(timestamp.longValue())
+                    .txId(txId)
+                    .build();
+
+            sendAndRetry(coordinatorTxMetaFuture, coordinator, request);
+        }
+    }
+
+    private void resolveTxStateFromCommitPartition(
+            UUID txId,
+            ReplicationGroupId commitGrpId,
+            CompletableFuture<TransactionMeta> txMetaFuture
+    ) {
+        TxStateCommitPartitionRequest request = 
FACTORY.txStateCommitPartitionRequest()
+                .groupId(commitGrpId)
+                .txId(txId)
+                .build();
+
+        updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
+
+        sendAndRetry(txMetaFuture, commitGrpId, request);
+    }
+
+    private void updateLocalTxMapAfterDistributedStateResolved(UUID txId, 
CompletableFuture<TransactionMeta> future) {
+        future.thenAccept(txMeta -> {
+            if (txMeta != null && txMeta instanceof TxStateMeta) {
+                txManager.updateTxMeta(txId, old -> (TxStateMeta) txMeta);
+            }
+        });
     }
 
     /**
@@ -87,7 +283,11 @@ public class TransactionStateResolver {
      * @param replicaGrp Replication group id.
      * @param request Request.
      */
-    private void sendAndRetry(CompletableFuture<TxMeta> resFut, 
ReplicationGroupId replicaGrp, TxStateReplicaRequest request) {
+    private void sendAndRetry(
+            CompletableFuture<TransactionMeta> resFut,
+            ReplicationGroupId replicaGrp,
+            TxStateCommitPartitionRequest request
+    ) {
         ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).stream()
                 .map(clusterNodeResolver)
                 .filter(Objects::nonNull)
@@ -115,4 +315,46 @@ public class TransactionStateResolver {
             }
         });
     }
+
+    /**
+     * Tries to send a request to the given node.
+     *
+     * @param resFut Response future.
+     * @param node Node to send to.
+     * @param request Request.
+     */
+    private void sendAndRetry(CompletableFuture<TransactionMeta> resFut, 
ClusterNode node, TxStateCoordinatorRequest request) {
+        messagingService.invoke(node, request, RPC_TIMEOUT).thenAccept(resp -> 
{
+            assert resp instanceof TxStateResponse : "Unsupported response 
type [type=" + resp.getClass().getSimpleName() + ']';
+
+            TxStateResponse response = (TxStateResponse) resp;
+
+            resFut.complete(response.txStateMeta());
+        });
+    }
+
+    /**
+     * Processes the transaction state requests that are used for coordinator 
path based write intent resolution. Can't return
+     * {@link org.apache.ignite.internal.tx.TxState#FINISHING}, it waits for 
actual completion instead.
+     *
+     * @param request Request.
+     * @return Future that should be completed with transaction state meta.
+     */
+    private CompletableFuture<TransactionMeta> 
processTxStateRequest(TxStateCoordinatorRequest request) {
+        clock.update(request.readTimestamp());
+
+        UUID txId = request.txId();
+
+        TxStateMeta txStateMeta = txManager.stateMeta(txId);
+
+        if (txStateMeta != null && txStateMeta.txState() == FINISHING) {
+            assert txStateMeta instanceof TxStateMetaFinishing;
+
+            TxStateMetaFinishing txStateMetaFinishing = (TxStateMetaFinishing) 
txStateMeta;
+
+            return txStateMetaFinishing.txFinishFuture();
+        } else {
+            return completedFuture(txStateMeta);
+        }
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 7277551497..951e77a0b6 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -25,9 +26,11 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -44,7 +47,6 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.table.Table;
@@ -110,14 +112,13 @@ public class TxLocalTest extends TxAbstractTest {
         TransactionStateResolver transactionStateResolver = 
mock(TransactionStateResolver.class, RETURNS_DEEP_STUBS);
 
         doAnswer(invocationOnMock -> {
-            TxStateReplicaRequest request = invocationOnMock.getArgument(1);
+            UUID txId = invocationOnMock.getArgument(0);
 
-            return CompletableFuture.completedFuture(
-                    
tables.get(request.groupId()).txStateStorage().getTxStateStorage(0).get(request.txId()));
-        }).when(transactionStateResolver).sendMetaRequest(any(), any());
+            return completedFuture(txManager.stateMeta(txId));
+        }).when(transactionStateResolver).resolveTxState(any(), any(), any());
 
-        txManager = new TxManagerImpl(replicaSvc, lockManager, 
DummyInternalTableImpl.CLOCK,
-                new TransactionIdGenerator(0xdeadbeef), () -> localNodeName);
+        txManager = new TxManagerImpl(replicaSvc, lockManager, 
DummyInternalTableImpl.CLOCK, new TransactionIdGenerator(0xdeadbeef),
+                () -> localNodeName);
 
         igniteTransactions = new IgniteTransactionsImpl(txManager, 
timestampTracker);
 
@@ -173,39 +174,8 @@ public class TxLocalTest extends TxAbstractTest {
         return true;
     }
 
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
     @Override
-    public void testReadOnlyGet() {
-        // No-op
-    }
-
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
-    @Override
-    public void testReadOnlyScan() throws Exception {
-        // No-op
-    }
-
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
-    @Override
-    public void testReadOnlyGetWriteIntentResolutionUpdate() {
-        // No-op
-    }
-
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
-    @Override
-    public void testReadOnlyGetWriteIntentResolutionRemove() {
-        // No-op
-    }
-
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
-    @Override
-    public void testReadOnlyGetAll() {
-        // No-op
-    }
-
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
-    @Override
-    public void testReadOnlyPendingWriteIntentSkippedCombined() {
-        super.testReadOnlyPendingWriteIntentSkippedCombined();
+    protected Collection<TxManager> txManagers() {
+        return List.of(txManager);
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index be9c6890fa..3452330b4b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -153,6 +154,7 @@ import 
org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
@@ -170,6 +172,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.sql.ColumnType;
@@ -270,7 +273,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     /** Another (not local) cluster node. */
     private final ClusterNode anotherNode = new ClusterNodeImpl("node2", 
"node2", NetworkAddress.from("127.0.0.2:127"));
 
-    private final TransactionStateResolver transactionStateResolver = 
mock(TransactionStateResolver.class);
+    private TransactionStateResolver transactionStateResolver;
 
     private final PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(testMvPartitionStorage);
 
@@ -297,6 +300,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     @Mock
     private CatalogTables catalogTables;
 
+    @Mock
+    private MessagingService messagingService;
+
     /** Schema descriptor for tests. */
     private SchemaDescriptor schemaDescriptor;
 
@@ -381,23 +387,6 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         when(topologySrv.localMember()).thenReturn(localNode);
 
-        HybridTimestamp txFixedTimestamp = clock.now();
-
-        when(transactionStateResolver.sendMetaRequest(any(), 
any())).thenAnswer(invocationOnMock -> {
-            TxMeta txMeta;
-
-            if (txState == null) {
-                txMeta = null;
-            } else if (txState == TxState.COMMITED) {
-                txMeta = new TxMeta(TxState.COMMITED, singletonList(grpId), 
txFixedTimestamp);
-            } else {
-                assert txState == TxState.ABORTED : "Sate is " + txState;
-
-                txMeta = new TxMeta(TxState.ABORTED, singletonList(grpId), 
txFixedTimestamp);
-            }
-            return completedFuture(txMeta);
-        });
-
         when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
 
         
when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));
@@ -453,6 +442,22 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         doAnswer(invocation -> txStateMeta).when(txManager).stateMeta(any());
 
+        doAnswer(invocation -> {
+            var resp = new 
TxMessagesFactory().txStateResponse().txStateMeta(txStateMeta).build();
+            return completedFuture(resp);
+        }).when(messagingService).invoke(any(ClusterNode.class), any(), 
anyLong());
+
+        transactionStateResolver = new TransactionStateResolver(
+                mock(ReplicaService.class),
+                txManager,
+                clock,
+                consistentId -> consistentId.equals(localNode.name()) ? 
localNode : anotherNode,
+                id -> id.equals(localNode.id()) ? localNode : anotherNode,
+                messagingService
+        );
+
+        transactionStateResolver.start();
+
         partitionReplicaListener = new PartitionReplicaListener(
                 testMvPartitionStorage,
                 mockRaftClient,
@@ -525,9 +530,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     @Test
     public void testTxStateReplicaRequestEmptyState() throws Exception {
-        CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+        CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(grpId)
-                .readTimestampLong(clock.nowLong())
                 .txId(newTxId())
                 .build(), "senderId");
 
@@ -545,15 +549,14 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         HybridTimestamp readTimestamp = clock.now();
 
-        CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+        CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(grpId)
-                .readTimestampLong(readTimestamp.longValue())
                 .txId(txId)
                 .build(), localNode.id());
 
         LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
 
-        TxMeta txMeta = tuple.txMeta();
+        TransactionMeta txMeta = tuple.txMeta();
         assertNotNull(txMeta);
         assertEquals(TxState.COMMITED, txMeta.txState());
         assertNotNull(txMeta.commitTimestamp());
@@ -566,9 +569,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     public void testTxStateReplicaRequestMissLeaderMiss() throws Exception {
         localLeader = false;
 
-        CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+        CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(grpId)
-                .readTimestampLong(clock.nowLong())
                 .txId(newTxId())
                 .build(), localNode.id());
 
@@ -627,6 +629,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         pkStorage().put(testBinaryRow, rowId);
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, 
localNode.id(), clock.now()));
 
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
                 .groupId(grpId)
@@ -649,6 +652,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         pkStorage().put(testBinaryRow, rowId);
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, 
localNode.id(), null));
 
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
                 .groupId(grpId)
@@ -672,6 +676,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         pkStorage().put(testBinaryRow, rowId);
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, 
PART_ID);
+        txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED, 
localNode.id(), null));
 
         CompletableFuture<?> fut = 
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
                 .groupId(grpId)
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 1b3516092f..67ceb5d659 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
@@ -137,6 +138,10 @@ import org.junit.jupiter.api.TestInfo;
  * Class that allows to mock a cluster for transaction tests' purposes.
  */
 public class ItTxTestCluster {
+    private final List<NetworkAddress> localAddresses;
+
+    private final NodeFinder nodeFinder;
+
     private final RaftConfiguration raftConfig;
 
     private final GcConfiguration gcConfig;
@@ -173,6 +178,8 @@ public class ItTxTestCluster {
 
     protected TxManager clientTxManager;
 
+    protected TransactionStateResolver clientTxStateResolver;
+
     protected Map<String, List<RaftGroupService>> raftClients = new 
HashMap<>();
 
     protected Map<String, TxStateStorage> txStateStorages;
@@ -201,6 +208,22 @@ public class ItTxTestCluster {
         return null;
     };
 
+    private final Function<String, ClusterNode> idToNode = id -> {
+        for (ClusterService service : cluster) {
+            ClusterNode clusterNode = service.topologyService().localMember();
+
+            if (clusterNode.id().equals(id)) {
+                return clusterNode;
+            }
+        }
+
+        if (client != null && 
client.topologyService().localMember().id().equals(id)) {
+            return client.topologyService().localMember();
+        }
+
+        return null;
+    };
+
     private final TestInfo testInfo;
 
     /** Observable timestamp tracker. */
@@ -227,6 +250,9 @@ public class ItTxTestCluster {
         this.startClient = startClient;
         this.testInfo = testInfo;
         this.timestampTracker = timestampTracker;
+
+        localAddresses = findLocalAddresses(NODE_PORT_BASE, NODE_PORT_BASE + 
nodes);
+        nodeFinder = new StaticNodeFinder(localAddresses);
     }
 
     /**
@@ -236,10 +262,6 @@ public class ItTxTestCluster {
         assertTrue(nodes > 0);
         assertTrue(replicas > 0);
 
-        List<NetworkAddress> localAddresses = 
findLocalAddresses(NODE_PORT_BASE, NODE_PORT_BASE + nodes);
-
-        var nodeFinder = new StaticNodeFinder(localAddresses);
-
         clusterServices = new ConcurrentHashMap<>(nodes);
 
         nodeFinder.findNodes().parallelStream()
@@ -258,20 +280,7 @@ public class ItTxTestCluster {
         LOG.info("The cluster has been started");
 
         if (startClient) {
-            client = startNode(testInfo, "client", NODE_PORT_BASE - 1, 
nodeFinder);
-
-            assertTrue(waitForTopology(client, nodes + 1, 1000));
-
-            clientClock = new HybridClockImpl();
-
-            LOG.info("Replica manager has been started, node=[" + 
client.topologyService().localMember() + ']');
-
-            clientReplicaSvc = new ReplicaService(
-                    client.messagingService(),
-                    clientClock
-            );
-
-            LOG.info("The client has been started");
+            startClient();
         }
 
         // Start raft servers. Each raft server can hold multiple groups.
@@ -345,16 +354,15 @@ public class ItTxTestCluster {
         localNodeName = cluster.get(0).topologyService().localMember().name();
 
         if (startClient) {
-            clientTxManager = new TxManagerImpl(clientReplicaSvc, new 
HeapLockManager(), clientClock, new TransactionIdGenerator(-1),
-                    () -> cluster.get(0).topologyService().localMember().id());
+            initializeClientTxComponents();
         } else {
             // Collocated mode.
             clientTxManager = txManagers.get(localNodeName);
         }
 
-        assertNotNull(clientTxManager);
-
         igniteTransactions = new IgniteTransactionsImpl(clientTxManager, 
timestampTracker);
+
+        assertNotNull(clientTxManager);
     }
 
     public IgniteTransactions igniteTransactions() {
@@ -401,7 +409,15 @@ public class ItTxTestCluster {
                 var mvTableStorage = new TestMvTableStorage(tableId, 
DEFAULT_PARTITION_COUNT);
                 var mvPartStorage = new TestMvPartitionStorage(partId);
                 var txStateStorage = txStateStorages.get(assignment);
-                var transactionStateResolver = new 
TransactionStateResolver(replicaServices.get(assignment), consistentIdToNode);
+                var transactionStateResolver = new TransactionStateResolver(
+                        replicaServices.get(assignment),
+                        txManagers.get(assignment),
+                        clocks.get(assignment),
+                        consistentIdToNode,
+                        idToNode,
+                        clusterServices.get(assignment).messagingService()
+                );
+                transactionStateResolver.start();
 
                 for (int part = 0; part < assignments.size(); part++) {
                     
transactionStateResolver.updateAssignment(grpIds.get(part), 
assignments.get(part));
@@ -680,4 +696,38 @@ public class ItTxTestCluster {
 
         return network;
     }
+
+    private void startClient() throws InterruptedException {
+        client = startNode(testInfo, "client", NODE_PORT_BASE - 1, nodeFinder);
+
+        assertTrue(waitForTopology(client, nodes + 1, 1000));
+
+        clientClock = new HybridClockImpl();
+
+        LOG.info("Replica manager has been started, node=[" + 
client.topologyService().localMember() + ']');
+
+        clientReplicaSvc = new ReplicaService(
+                client.messagingService(),
+                clientClock
+        );
+
+        LOG.info("The client has been started");
+    }
+
+    private void initializeClientTxComponents() {
+        Supplier<String> localNodeIdSupplier = () -> 
client.topologyService().localMember().id();
+
+        clientTxManager = new TxManagerImpl(clientReplicaSvc, new 
HeapLockManager(), clientClock, new TransactionIdGenerator(-1),
+                localNodeIdSupplier);
+
+        clientTxStateResolver = new TransactionStateResolver(
+                clientReplicaSvc,
+                clientTxManager,
+                clientClock,
+                consistentIdToNode,
+                idToNode,
+                client.messagingService()
+        );
+        clientTxStateResolver.start();
+    }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index fdabc84947..69cc7409ef 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -41,6 +41,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CyclicBarrier;
@@ -65,6 +66,7 @@ import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.LockMode;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.Pair;
@@ -119,8 +121,6 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
 
     protected IgniteTransactions igniteTransactions;
 
-    protected TxManager clientTxManager;
-
     @Test
     public void testCommitRollbackSameTxDoesNotThrow() throws 
TransactionException {
         InternalTransaction tx = (InternalTransaction) 
igniteTransactions.begin();
@@ -1633,7 +1633,7 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
      * @param id The id.
      * @return The key tuple.
      */
-    private Tuple makeKey(long id) {
+    protected Tuple makeKey(long id) {
         return Tuple.create().set("accountNumber", id);
     }
 
@@ -1766,7 +1766,6 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         });
     }
 
-
     @Test
     public void testReadOnlyGet() {
         accounts.recordView().upsert(null, makeValue(1, 100.));
@@ -1935,6 +1934,30 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         assertEquals(BALANCE_1, accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
     }
 
+    @Test
+    public void testWriteIntentResolutionFallbackToCommitPartitionPath() {
+        accounts.recordView().upsert(null, makeValue(1, 100.));
+
+        // Pending tx
+        Transaction tx = igniteTransactions.begin();
+        accounts.recordView().delete(tx, makeKey(1));
+
+        // Imitate the restart of the client node, which is a tx coordinator, 
in order to make its volatile state of unavailable.
+        // Now coordinator path of the write intent resolution has no effect, 
and we should fallback to commit partition path.
+        UUID txId = ((ReadWriteTransactionImpl) tx).id();
+
+        for (TxManager txManager : txManagers()) {
+            txManager.updateTxMeta(txId, old -> old == null ? null : new 
TxStateMeta(old.txState(), "restarted", old.commitTimestamp()));
+        }
+
+        // Read-only.
+        Transaction readOnlyTx = igniteTransactions.begin(new 
TransactionOptions().readOnly(true));
+        assertEquals(100., accounts.recordView().get(readOnlyTx, 
makeKey(1)).doubleValue("balance"));
+
+        // Commit pending tx.
+        tx.commit();
+    }
+
     /**
      * Checks operations that act after a transaction is committed, are 
finished with exception.
      *
@@ -1992,4 +2015,11 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
             assertThat(res, contains(null, null));
         }
     }
+
+    /**
+     * Returns server nodes' tx managers.
+     *
+     * @return Server nodes' tx managers.
+     */
+    protected abstract Collection<TxManager> txManagers();
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAbandonedException.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAbandonedException.java
new file mode 100644
index 0000000000..c35e1ada47
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionAbandonedException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ABANDONED_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Exception that is thrown when write intent can't be resolved because it has 
been created by a transaction that is now abandoned
+ * (has a state that currently can't be known).
+ */
+public class TransactionAbandonedException extends TransactionException {
+    private final UUID abandonedTxId;
+
+    @Nullable
+    private final TransactionMeta transactionMeta;
+
+    /**
+     * Constructor.
+     *
+     * @param abandonedTxId ID of the transaction that is abandoned.
+     * @param transactionMeta Transaction meta that was able to be retrieved.
+     */
+    public TransactionAbandonedException(UUID abandonedTxId, @Nullable 
TransactionMeta transactionMeta) {
+        super(UUID.randomUUID(), TX_ABANDONED_ERR, null);
+
+        this.abandonedTxId = abandonedTxId;
+        this.transactionMeta = transactionMeta;
+    }
+
+    @Override
+    public String getMessage() {
+        return "Operation failed due to abandoned transaction [abandonedTxId=" 
+ abandonedTxId
+                + ", transactionMeta=" + transactionMeta + ']';
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
similarity index 60%
copy from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
index 26ae27b38f..975a818479 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
@@ -15,25 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.tx.message;
+package org.apache.ignite.internal.tx;
 
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import java.util.UUID;
+import java.io.Serializable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Transaction state request.
+ * Transaction metadata interface.
  */
-@Transferable(TxMessageGroup.TX_STATE_REQUEST)
-public interface TxStateReplicaRequest extends ReplicaRequest {
-    UUID txId();
-
-    long readTimestampLong();
+public interface TransactionMeta extends Serializable {
+    /** Tx state. */
+    TxState txState();
 
-    default HybridTimestamp readTimestamp() {
-        return hybridTimestamp(readTimestampLong());
-    }
+    /** Commit timestamp. */
+    @Nullable HybridTimestamp commitTimestamp();
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
index ea312e6f46..8eab25e38a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.tx;
 
 import static java.util.Collections.unmodifiableList;
 
-import java.io.Serializable;
 import java.util.List;
 import java.util.Objects;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -28,7 +27,7 @@ import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
 /** Transaction meta. */
-public class TxMeta implements Serializable {
+public class TxMeta implements TransactionMeta {
     /** Serial version UID. */
     private static final long serialVersionUID = -172513482743911860L;
 
@@ -55,6 +54,7 @@ public class TxMeta implements Serializable {
         this.commitTimestamp = commitTimestamp;
     }
 
+    @Override
     public TxState txState() {
         return txState;
     }
@@ -63,6 +63,7 @@ public class TxMeta implements Serializable {
         return unmodifiableList(enlistedPartitions);
     }
 
+    @Override
     public @Nullable HybridTimestamp commitTimestamp() {
         return commitTimestamp;
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index f0a866238a..c5a897b647 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -26,16 +26,22 @@ public enum TxState {
     PENDING,
     FINISHING,
     ABORTED,
-    COMMITED;
+    COMMITED,
+    ABANDONED;
 
     private static final boolean[][] TRANSITION_MATRIX = {
-            { false, true,  false, true,  true },
-            { false, true,  true,  true,  true },
-            { false, false, false, true,  true },
-            { false, false, false, true,  false},
-            { false, false, false, false, true },
+            { false, true,  false, true,  true,  true },
+            { false, true,  true,  true,  true,  true },
+            { false, false, false, true,  true,  true },
+            { false, false, false, true,  false, true },
+            { false, false, false, false, true,  true },
+            { true,  true,  true,  true,  true,  true }
     };
 
+    public static boolean isFinalState(TxState state) {
+        return state == COMMITED || state == ABORTED;
+    }
+
     /**
      * Checks the correctness of the transition between transaction states.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index dbb3da6133..df1e5f79ce 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -23,7 +23,9 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Transaction state meta.
  */
-public class TxStateMeta {
+public class TxStateMeta implements TransactionMeta {
+    private static final long serialVersionUID = 8521181896862227127L;
+
     private final TxState txState;
 
     private final String txCoordinatorId;
@@ -37,12 +39,17 @@ public class TxStateMeta {
      * @param txCoordinatorId Transaction coordinator id.
      * @param commitTimestamp Commit timestamp.
      */
-    public TxStateMeta(TxState txState, String txCoordinatorId, @Nullable 
HybridTimestamp commitTimestamp) {
+    public TxStateMeta(
+            TxState txState,
+            String txCoordinatorId,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
         this.txState = txState;
         this.txCoordinatorId = txCoordinatorId;
         this.commitTimestamp = commitTimestamp;
     }
 
+    @Override
     public TxState txState() {
         return txState;
     }
@@ -51,6 +58,7 @@ public class TxStateMeta {
         return txCoordinatorId;
     }
 
+    @Override
     public @Nullable HybridTimestamp commitTimestamp() {
         return commitTimestamp;
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
new file mode 100644
index 0000000000..9b85925d7c
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link TxStateMeta} implementation for {@link TxState#FINISHING} state. 
Contains future that is is completed after the state of
+ * corresponding transaction changes to final state.
+ */
+public class TxStateMetaFinishing extends TxStateMeta {
+    private static final long serialVersionUID = 9122953981654023665L;
+
+    /** Future that is completed after the state of corresponding transaction 
changes to final state. */
+    private final CompletableFuture<TransactionMeta> txFinishFuture = new 
CompletableFuture<>();
+
+    /**
+     * Constructor.
+     *
+     * @param txCoordinatorId Transaction coordinator id.
+     */
+    public TxStateMetaFinishing(String txCoordinatorId) {
+        super(TxState.FINISHING, txCoordinatorId, null);
+    }
+
+    /**
+     * Future that is completed after the state of corresponding transaction 
changes to final state.
+     *
+     * @return Future that is completed after the state of corresponding 
transaction changes to final state.
+     */
+    public CompletableFuture<TransactionMeta> txFinishFuture() {
+        return txFinishFuture;
+    }
+
+    @Override
+    public @Nullable HybridTimestamp commitTimestamp() {
+        throw new UnsupportedOperationException("Can't get commit timestamp 
from FINISHING transaction state meta.");
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 8216b05059..c0a6979335 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -22,9 +22,9 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITED;
-import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
 
 import java.util.Comparator;
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.TxStateMetaFinishing;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.util.Lazy;
@@ -230,14 +231,28 @@ public class TxManagerImpl implements TxManager {
     ) {
         assert groups != null;
 
+        // Here we put finishing state meta into the local map, so that all 
concurrent operations trying to read tx state
+        // with using read timestamp could see that this transaction is 
finishing, see #transactionMetaReadTimestampAware(txId, timestamp).
+        // None of them now are able to update node's clock with read 
timestamp and we can create the commit timestamp that is greater
+        // than all the read timestamps processed before.
+        // Every concurrent operation will now use a finish future from the 
finishing state meta and get only final transaction
+        // state after the transaction is finished.
+        TxStateMetaFinishing finishingStateMeta = new 
TxStateMetaFinishing(localNodeId.get());
+        updateTxMeta(txId, old -> finishingStateMeta);
         HybridTimestamp commitTimestamp = commit ? clock.now() : null;
 
         // If there are no enlisted groups, just return - we already marked 
the tx as finished.
         boolean finishRequestNeeded = !groups.isEmpty();
 
-        updateTxMeta(txId, old -> new TxStateMeta(finishRequestNeeded ? 
FINISHING : ABORTED, old.txCoordinatorId(), commitTimestamp));
-
         if (!finishRequestNeeded) {
+            updateTxMeta(txId, old -> {
+                TxStateMeta finalStateMeta = 
coordinatorFinalTxStateMeta(commit, commitTimestamp);
+
+                finishingStateMeta.txFinishFuture().complete(finalStateMeta);
+
+                return finalStateMeta;
+            });
+
             return completedFuture(null);
         }
 
@@ -254,11 +269,23 @@ public class TxManagerImpl implements TxManager {
                 .build();
 
         return replicaService.invoke(recipientNode, req)
-                .thenRun(() -> updateTxMeta(txId, old -> new TxStateMeta(
-                        commit ? COMMITED : ABORTED,
-                        old.txCoordinatorId(),
-                        old.commitTimestamp()
-                )));
+                .thenRun(() -> {
+                    updateTxMeta(txId, old -> {
+                        if (isFinalState(old.txState())) {
+                            finishingStateMeta.txFinishFuture().complete(old);
+
+                            return old;
+                        }
+
+                        assert old instanceof TxStateMetaFinishing;
+
+                        TxStateMeta finalTxStateMeta = 
coordinatorFinalTxStateMeta(commit, commitTimestamp);
+
+                        
finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
+
+                        return finalTxStateMeta;
+                    });
+                });
     }
 
     @Override
@@ -353,4 +380,15 @@ public class TxManagerImpl implements TxManager {
             lowWatermarkReadWriteLock.writeLock().unlock();
         }
     }
+
+    /**
+     * Creates final {@link TxStateMeta} for coordinator node.
+     *
+     * @param commit Commit flag.
+     * @param commitTimestamp Commit timestamp.
+     * @return Transaction meta.
+     */
+    private TxStateMeta coordinatorFinalTxStateMeta(boolean commit, 
HybridTimestamp commitTimestamp) {
+        return new TxStateMeta(commit ? COMMITED : ABORTED, localNodeId.get(), 
commitTimestamp);
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index c9ab6bedec..00abd58a1b 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -40,7 +40,17 @@ public class TxMessageGroup {
     public static final short TX_CLEANUP_REQUEST = 2;
 
     /**
-     * Message type for {@link TxStateReplicaRequest}.
+     * Message type for {@link TxStateCommitPartitionRequest}.
      */
-    public static final short TX_STATE_REQUEST = 3;
+    public static final short TX_STATE_COMMIT_PARTITION_REQUEST = 3;
+
+    /**
+     * Message type for {@link TxStateCoordinatorRequest}.
+     */
+    public static final short TX_STATE_COORDINATOR_REQUEST = 4;
+
+    /**
+     * Message type for {@link TxStateResponse}.
+     */
+    public static final short TX_STATE_RESPONSE = 5;
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCommitPartitionRequest.java
similarity index 73%
copy from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCommitPartitionRequest.java
index 26ae27b38f..76ed5803ad 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCommitPartitionRequest.java
@@ -17,23 +17,14 @@
 
 package org.apache.ignite.internal.tx.message;
 
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
 import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Transaction state request.
  */
-@Transferable(TxMessageGroup.TX_STATE_REQUEST)
-public interface TxStateReplicaRequest extends ReplicaRequest {
+@Transferable(TxMessageGroup.TX_STATE_COMMIT_PARTITION_REQUEST)
+public interface TxStateCommitPartitionRequest extends ReplicaRequest {
     UUID txId();
-
-    long readTimestampLong();
-
-    default HybridTimestamp readTimestamp() {
-        return hybridTimestamp(readTimestampLong());
-    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
similarity index 87%
copy from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
index 26ae27b38f..80b2495f2a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
@@ -21,14 +21,14 @@ import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * Transaction state request.
  */
-@Transferable(TxMessageGroup.TX_STATE_REQUEST)
-public interface TxStateReplicaRequest extends ReplicaRequest {
+@Transferable(TxMessageGroup.TX_STATE_COORDINATOR_REQUEST)
+public interface TxStateCoordinatorRequest extends NetworkMessage {
     UUID txId();
 
     long readTimestampLong();
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
similarity index 63%
rename from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
rename to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
index 26ae27b38f..9918a171eb 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
@@ -17,23 +17,18 @@
 
 package org.apache.ignite.internal.tx.message;
 
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Transaction state request.
+ * Transaction state response.
  */
-@Transferable(TxMessageGroup.TX_STATE_REQUEST)
-public interface TxStateReplicaRequest extends ReplicaRequest {
-    UUID txId();
-
-    long readTimestampLong();
-
-    default HybridTimestamp readTimestamp() {
-        return hybridTimestamp(readTimestampLong());
-    }
+@Transferable(TxMessageGroup.TX_STATE_RESPONSE)
+public interface TxStateResponse extends TimestampAware {
+    @Marshallable
+    @Nullable
+    TransactionMeta txStateMeta();
 }

Reply via email to