This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 66fe197a9b2 IGNITE-24361 Implement TxRecoveryMessage processing for 
zone replica (#5366)
66fe197a9b2 is described below

commit 66fe197a9b2b80c58e22e504572573030ab37b23
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Fri Mar 7 20:29:24 2025 +0400

    IGNITE-24361 Implement TxRecoveryMessage processing for zone replica (#5366)
---
 .../replicator/ItAbstractColocationTest.java       |   3 +-
 .../replicator/ItColocationTxRecoveryTest.java     | 115 +++++++++++++++++++++
 .../partition/replicator/TxRecoveryEngine.java     |   1 -
 .../replicator/ZonePartitionReplicaListener.java   |  39 ++++---
 .../ReplicaSafeTimeSyncRequestHandler.java         |   9 +-
 .../handlers/TxRecoveryMessageHandler.java         |  74 +++++++++++++
 .../VacuumTxStateReplicaRequestHandler.java        |   1 -
 .../replicator/PartitionReplicaListener.java       |  52 ++++------
 8 files changed, 241 insertions(+), 53 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java
index ffbcbc0add7..c28fdc1d588 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java
@@ -173,8 +173,9 @@ abstract class ItAbstractColocationTest extends 
IgniteAbstractTest {
         cluster.parallelStream().forEach(Node::start);
 
         Node node0 = cluster.get(0);
+        List<String> allNodeNames = cluster.stream().map(n -> 
n.name).collect(toList());
 
-        node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), 
"cluster");
+        node0.cmgManager.initCluster(allNodeNames, allNodeNames, "cluster");
 
         cluster.forEach(Node::waitWatches);
 
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java
new file mode 100644
index 00000000000..cee674d81ac
--- /dev/null
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.partition.replicator.fixtures.Node;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.Test;
+
+class ItColocationTxRecoveryTest extends ItAbstractColocationTest {
+    private static final long KEY = 1;
+
+    /**
+     * Tests that tx recovery works. Scenario:
+     *
+     * <ol>
+     *     <li>A transaction tx1 is started, it takes a shared lock on a key 
and never gets finished</li>
+     *     <li>Its coordinator (different from the node hosting the touched 
partition primary) is stopped, so the transaction becomes
+     *     abandoned</li>
+     *     <li>Transaction tx2 tries to write to the same key, founds an 
incompatible lock, realizes that it's held by an abandoned
+     *     transaction, and does tx recovery to remove the lock on the 
partition primary</li>
+     *     <li>tx2 should succeed</li>
+     * </ol>
+     */
+    @Test
+    void abandonedTransactionGetsAbortedOnTouch() throws Exception {
+        assertThat(txConfiguration.abandonedCheckTs().update(600_000L), 
willCompleteSuccessfully());
+
+        startCluster(3);
+
+        Node node0 = getNode(0);
+
+        // Create a zone with a single partition on every node.
+        int zoneId = createZone(node0, TEST_ZONE_NAME, 1, cluster.size());
+
+        createTable(node0, TEST_ZONE_NAME, TEST_TABLE_NAME1);
+
+        cluster.forEach(Node::waitForMetadataCompletenessAtNow);
+
+        putInitialValue(node0);
+
+        ReplicaMeta primaryReplica = getPrimaryReplica(zoneId);
+
+        Node coordinatorNodeToBeStopped = findAnyOtherNode(primaryReplica);
+        Transaction txToBeAbandoned = 
coordinatorNodeToBeStopped.transactions().begin();
+        // Trigger a shared lock to be taken on the key.
+        coordinatorNodeToBeStopped.tableManager.table(TEST_TABLE_NAME1)
+                .keyValueView(Long.class, Integer.class)
+                .get(txToBeAbandoned, KEY);
+
+        coordinatorNodeToBeStopped.stop();
+        cluster.remove(coordinatorNodeToBeStopped);
+
+        Node runningNode = cluster.get(0);
+
+        KeyValueView<Long, Integer> kvView = 
runningNode.tableManager.table(TEST_TABLE_NAME1).keyValueView(Long.class, 
Integer.class);
+
+        Transaction conflictingTx = runningNode.transactions().begin();
+        assertDoesNotThrow(() -> kvView.put(conflictingTx, KEY, 111));
+    }
+
+    private static void putInitialValue(Node node) {
+        node.tableManager
+                .table(TEST_TABLE_NAME1)
+                .keyValueView(Long.class, Integer.class)
+                .put(null, KEY, 42);
+    }
+
+    private ReplicaMeta getPrimaryReplica(int zoneId) {
+        Node node = cluster.get(0);
+
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
node.placementDriverManager.placementDriver().getPrimaryReplica(
+                new ZonePartitionId(zoneId, 0),
+                node.hybridClock.now()
+        );
+
+        assertThat(primaryReplicaFuture, willCompleteSuccessfully());
+
+        ReplicaMeta replicaMeta = primaryReplicaFuture.join();
+        assertThat(replicaMeta, is(notNullValue()));
+
+        return replicaMeta;
+    }
+
+    private Node findAnyOtherNode(ReplicaMeta primaryReplica) {
+        return cluster.stream()
+                .filter(node -> 
!node.name.equals(primaryReplica.getLeaseholder()))
+                .findAny()
+                .orElseThrow();
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
index 33345c5635d..d4930eb0c48 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
@@ -70,7 +70,6 @@ public class TxRecoveryEngine {
                         false,
                         Map.of(
                                 replicationGroupId,
-                                // Enlistment consistency token is not 
required for the rollback, so it is 0L.
                                 
abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId))
                         ),
                         txId
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index b598cdb5f20..39826070580 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.network.ClusterNodeResolver;
 import 
org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.ReplicaSafeTimeSyncRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
+import 
org.apache.ignite.internal.partition.replicator.handlers.TxRecoveryMessageHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.VacuumTxStateReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.WriteIntentSwitchRequestHandler;
@@ -49,6 +50,7 @@ import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
@@ -78,9 +80,10 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
     // Replica request handlers.
     private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
     private final WriteIntentSwitchRequestHandler 
writeIntentSwitchRequestHandler;
+    private final TxStateCommitPartitionReplicaRequestHandler 
txStateCommitPartitionReplicaRequestHandler;
+    private final TxRecoveryMessageHandler txRecoveryMessageHandler;
     private final MinimumActiveTxTimeReplicaRequestHandler 
minimumActiveTxTimeReplicaRequestHandler;
     private final VacuumTxStateReplicaRequestHandler 
vacuumTxStateReplicaRequestHandler;
-    private final TxStateCommitPartitionReplicaRequestHandler 
txStateCommitPartitionReplicaRequestHandler;
     private final ReplicaSafeTimeSyncRequestHandler 
replicaSafeTimeSyncRequestHandler;
 
     /**
@@ -116,6 +119,13 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
 
         this.raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftClient, replicationGroupId);
 
+        TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(
+                txManager,
+                clusterNodeResolver,
+                replicationGroupId,
+                
ZonePartitionReplicaListener::createAbandonedTxRecoveryEnlistment
+        );
+
         // Request handlers initialization.
 
         txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler(
@@ -138,26 +148,23 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
                 replicationGroupId
         );
 
-        minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
-                clockService,
-                raftCommandApplicator
-        );
-
-        vacuumTxStateReplicaRequestHandler = new 
VacuumTxStateReplicaRequestHandler(raftCommandApplicator);
-
         txStateCommitPartitionReplicaRequestHandler = new 
TxStateCommitPartitionReplicaRequestHandler(
                 txStatePartitionStorage,
                 txManager,
                 clusterNodeResolver,
                 localNode,
-                new TxRecoveryEngine(
-                        txManager,
-                        clusterNodeResolver,
-                        replicationGroupId,
-                        
ZonePartitionReplicaListener::createAbandonedTxRecoveryEnlistment
-                )
+                txRecoveryEngine
         );
 
+        txRecoveryMessageHandler = new 
TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, 
txRecoveryEngine);
+
+        minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
+                clockService,
+                raftCommandApplicator
+        );
+
+        vacuumTxStateReplicaRequestHandler = new 
VacuumTxStateReplicaRequestHandler(raftCommandApplicator);
+
         replicaSafeTimeSyncRequestHandler = new 
ReplicaSafeTimeSyncRequestHandler(clockService, raftCommandApplicator);
     }
 
@@ -198,6 +205,8 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
             return 
writeIntentSwitchRequestHandler.handle((WriteIntentSwitchReplicaRequest) 
request, senderId);
         } else if (request instanceof TxStateCommitPartitionRequest) {
             return 
txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest)
 request);
+        } else if (request instanceof TxRecoveryMessage) {
+            return txRecoveryMessageHandler.handle((TxRecoveryMessage) 
request, senderId);
         }
 
         return processZoneReplicaRequest(request, replicaPrimacy, senderId);
@@ -243,7 +252,7 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
         } else if (request instanceof 
UpdateMinimumActiveTxBeginTimeReplicaRequest) {
             return 
minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest)
 request);
         } else if (request instanceof ReplicaSafeTimeSyncRequest) {
-            return 
replicaSafeTimeSyncRequestHandler.handle((ReplicaSafeTimeSyncRequest) request);
+            return 
replicaSafeTimeSyncRequestHandler.handle((ReplicaSafeTimeSyncRequest) request, 
replicaPrimacy.isPrimary());
         } else {
             LOG.warn("Non table request is not supported by the zone partition 
yet " + request);
         }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/ReplicaSafeTimeSyncRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/ReplicaSafeTimeSyncRequestHandler.java
index 3a58c4a4557..f4ac6aee9bf 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/ReplicaSafeTimeSyncRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/ReplicaSafeTimeSyncRequestHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.partition.replicator.handlers;
 
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.ClockService;
 import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
@@ -54,9 +56,14 @@ public class ReplicaSafeTimeSyncRequestHandler {
      * Handles {@link ReplicaSafeTimeSyncRequest}.
      *
      * @param request Request to handle.
+     * @param isPrimary Whether current node is a primary replica.
      * @return Future that will be completed when the request is handled.
      */
-    public CompletableFuture<?> handle(ReplicaSafeTimeSyncRequest request) {
+    public CompletableFuture<?> handle(ReplicaSafeTimeSyncRequest request, 
boolean isPrimary) {
+        if (!isPrimary) {
+            return nullCompletedFuture();
+        }
+
         return commandApplicator.applyCommandWithExceptionHandling(
                 
REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().initiatorTime(clockService.now()).build()
         );
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
new file mode 100644
index 00000000000..b69b276d7ea
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.handlers;
+
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+
+/**
+ * Handler for processing {@link TxRecoveryMessage}s.
+ */
+public class TxRecoveryMessageHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxRecoveryMessageHandler.class);
+
+    private final TxStatePartitionStorage txStatePartitionStorage;
+    private final ReplicationGroupId replicationGroupId;
+    private final TxRecoveryEngine txRecoveryEngine;
+
+    /** Constructor. */
+    public TxRecoveryMessageHandler(
+            TxStatePartitionStorage txStatePartitionStorage,
+            ReplicationGroupId replicationGroupId,
+            TxRecoveryEngine txRecoveryEngine
+    ) {
+        this.txStatePartitionStorage = txStatePartitionStorage;
+        this.replicationGroupId = replicationGroupId;
+        this.txRecoveryEngine = txRecoveryEngine;
+    }
+
+    /**
+     * Processes transaction recovery request on a commit partition.
+     *
+     * @param request Tx recovery request.
+     * @return The future is complete when the transaction state is finalized.
+     */
+    public CompletableFuture<Void> handle(TxRecoveryMessage request, UUID 
senderId) {
+        UUID txId = request.txId();
+
+        TxMeta txMeta = txStatePartitionStorage.get(txId);
+
+        // Check whether a transaction has already been finished.
+        if (txMeta != null && isFinalState(txMeta.txState())) {
+            // Tx recovery message is processed on the commit partition.
+            return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId, 
senderId);
+        }
+
+        LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", 
txId, txMeta);
+
+        return txRecoveryEngine.triggerTxRecovery(txId, senderId);
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/VacuumTxStateReplicaRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/VacuumTxStateReplicaRequestHandler.java
index 1fd44869045..c7f37d66650 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/VacuumTxStateReplicaRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/VacuumTxStateReplicaRequestHandler.java
@@ -53,7 +53,6 @@ public class VacuumTxStateReplicaRequestHandler {
                 .txIds(request.transactionIds())
                 .build();
 
-
         return commandApplicator.applyCommand(cmd);
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 455ca521444..869b6323462 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -112,6 +112,7 @@ import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApp
 import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
 import 
org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
+import 
org.apache.ignite.internal.partition.replicator.handlers.TxRecoveryMessageHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.handlers.VacuumTxStateReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
@@ -356,13 +357,13 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
     private final ReliableCatalogVersions reliableCatalogVersions;
     private final ReplicationRaftCommandApplicator raftCommandApplicator;
     private final ReplicaTxFinishMarker replicaTxFinishMarker;
-    private final TxRecoveryEngine txRecoveryEngine;
 
     // Replica request handlers.
     private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
+    private final TxStateCommitPartitionReplicaRequestHandler 
txStateCommitPartitionReplicaRequestHandler;
+    private final TxRecoveryMessageHandler txRecoveryMessageHandler;
     private final MinimumActiveTxTimeReplicaRequestHandler 
minimumActiveTxTimeReplicaRequestHandler;
     private final VacuumTxStateReplicaRequestHandler 
vacuumTxStateReplicaRequestHandler;
-    private final TxStateCommitPartitionReplicaRequestHandler 
txStateCommitPartitionReplicaRequestHandler;
     private final BuildIndexReplicaRequestHandler 
buildIndexReplicaRequestHandler;
 
     /**
@@ -445,7 +446,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
         raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
         replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
-        txRecoveryEngine = new TxRecoveryEngine(
+        TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(
                 txManager,
                 clusterNodeResolver,
                 replicationGroupId,
@@ -463,18 +464,21 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                 replicationGroupId
         );
 
-        minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
-                clockService,
-                raftCommandApplicator);
-
-        vacuumTxStateReplicaRequestHandler = new 
VacuumTxStateReplicaRequestHandler(raftCommandApplicator);
-
         txStateCommitPartitionReplicaRequestHandler = new 
TxStateCommitPartitionReplicaRequestHandler(
                 txStatePartitionStorage,
                 txManager,
                 clusterNodeResolver,
                 localNode,
-                txRecoveryEngine);
+                txRecoveryEngine
+        );
+
+        txRecoveryMessageHandler = new 
TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, 
txRecoveryEngine);
+
+        minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
+                clockService,
+                raftCommandApplicator);
+
+        vacuumTxStateReplicaRequestHandler = new 
VacuumTxStateReplicaRequestHandler(raftCommandApplicator);
 
         buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler(
                 indexMetaStorage,
@@ -489,8 +493,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
     // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove.
     private PendingTxPartitionEnlistment 
createAbandonedTxRecoveryEnlistment(ClusterNode node) {
-        // Enlistment consistency token is not required for the rollback, so 
it is 0L.
         assert !enabledColocation() : "Unexpected method call within 
colocation enabled.";
+        // Enlistment consistency token is not required for the rollback, so 
it is 0L.
         // This method is not called in a colocation context, thus it's valid 
to cast replicationGroupId to TablePartitionId.
         return new PendingTxPartitionEnlistment(node.name(), 0L, 
((TablePartitionId) replicationGroupId).tableId());
     }
@@ -599,7 +603,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         }
 
         if (request instanceof TxRecoveryMessage) {
-            return processTxRecoveryMessage((TxRecoveryMessage) request, 
senderId);
+            assert !enabledColocation() : "Unexpected method call within 
colocation enabled.";
+
+            return txRecoveryMessageHandler.handle((TxRecoveryMessage) 
request, senderId);
         }
 
         if (request instanceof TxCleanupRecoveryRequest) {
@@ -661,28 +667,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         return nullCompletedFuture();
     }
 
-    /**
-     * Processes transaction recovery request on a commit partition.
-     *
-     * @param request Tx recovery request.
-     * @return The future is complete when the transaction state is finalized.
-     */
-    private CompletableFuture<Void> processTxRecoveryMessage(TxRecoveryMessage 
request, UUID senderId) {
-        UUID txId = request.txId();
-
-        TxMeta txMeta = txStatePartitionStorage.get(txId);
-
-        // Check whether a transaction has already been finished.
-        if (txMeta != null && isFinalState(txMeta.txState())) {
-            // Tx recovery message is processed on the commit partition.
-            return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId, 
senderId);
-        }
-
-        LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", 
txId, txMeta);
-
-        return txRecoveryEngine.triggerTxRecovery(txId, senderId);
-    }
-
     private CompletableFuture<Void> 
processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest
 request) {
         TablePartitionId replicaGrpId = (TablePartitionId) 
request.groupId().asReplicationGroupId();
 

Reply via email to