This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 66fe197a9b2 IGNITE-24361 Implement TxRecoveryMessage processing for zone replica (#5366) 66fe197a9b2 is described below commit 66fe197a9b2b80c58e22e504572573030ab37b23 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Fri Mar 7 20:29:24 2025 +0400 IGNITE-24361 Implement TxRecoveryMessage processing for zone replica (#5366) --- .../replicator/ItAbstractColocationTest.java | 3 +- .../replicator/ItColocationTxRecoveryTest.java | 115 +++++++++++++++++++++ .../partition/replicator/TxRecoveryEngine.java | 1 - .../replicator/ZonePartitionReplicaListener.java | 39 ++++--- .../ReplicaSafeTimeSyncRequestHandler.java | 9 +- .../handlers/TxRecoveryMessageHandler.java | 74 +++++++++++++ .../VacuumTxStateReplicaRequestHandler.java | 1 - .../replicator/PartitionReplicaListener.java | 52 ++++------ 8 files changed, 241 insertions(+), 53 deletions(-) diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java index ffbcbc0add7..c28fdc1d588 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItAbstractColocationTest.java @@ -173,8 +173,9 @@ abstract class ItAbstractColocationTest extends IgniteAbstractTest { cluster.parallelStream().forEach(Node::start); Node node0 = cluster.get(0); + List<String> allNodeNames = cluster.stream().map(n -> n.name).collect(toList()); - node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), "cluster"); + node0.cmgManager.initCluster(allNodeNames, allNodeNames, "cluster"); cluster.forEach(Node::waitWatches); diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java new file mode 100644 index 00000000000..cee674d81ac --- /dev/null +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItColocationTxRecoveryTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.partition.replicator.fixtures.Node; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.Test; + +class ItColocationTxRecoveryTest extends ItAbstractColocationTest { + private static final long KEY = 1; + + /** + * Tests that tx recovery works. Scenario: + * + * <ol> + * <li>A transaction tx1 is started, it takes a shared lock on a key and never gets finished</li> + * <li>Its coordinator (different from the node hosting the touched partition primary) is stopped, so the transaction becomes + * abandoned</li> + * <li>Transaction tx2 tries to write to the same key, founds an incompatible lock, realizes that it's held by an abandoned + * transaction, and does tx recovery to remove the lock on the partition primary</li> + * <li>tx2 should succeed</li> + * </ol> + */ + @Test + void abandonedTransactionGetsAbortedOnTouch() throws Exception { + assertThat(txConfiguration.abandonedCheckTs().update(600_000L), willCompleteSuccessfully()); + + startCluster(3); + + Node node0 = getNode(0); + + // Create a zone with a single partition on every node. + int zoneId = createZone(node0, TEST_ZONE_NAME, 1, cluster.size()); + + createTable(node0, TEST_ZONE_NAME, TEST_TABLE_NAME1); + + cluster.forEach(Node::waitForMetadataCompletenessAtNow); + + putInitialValue(node0); + + ReplicaMeta primaryReplica = getPrimaryReplica(zoneId); + + Node coordinatorNodeToBeStopped = findAnyOtherNode(primaryReplica); + Transaction txToBeAbandoned = coordinatorNodeToBeStopped.transactions().begin(); + // Trigger a shared lock to be taken on the key. + coordinatorNodeToBeStopped.tableManager.table(TEST_TABLE_NAME1) + .keyValueView(Long.class, Integer.class) + .get(txToBeAbandoned, KEY); + + coordinatorNodeToBeStopped.stop(); + cluster.remove(coordinatorNodeToBeStopped); + + Node runningNode = cluster.get(0); + + KeyValueView<Long, Integer> kvView = runningNode.tableManager.table(TEST_TABLE_NAME1).keyValueView(Long.class, Integer.class); + + Transaction conflictingTx = runningNode.transactions().begin(); + assertDoesNotThrow(() -> kvView.put(conflictingTx, KEY, 111)); + } + + private static void putInitialValue(Node node) { + node.tableManager + .table(TEST_TABLE_NAME1) + .keyValueView(Long.class, Integer.class) + .put(null, KEY, 42); + } + + private ReplicaMeta getPrimaryReplica(int zoneId) { + Node node = cluster.get(0); + + CompletableFuture<ReplicaMeta> primaryReplicaFuture = node.placementDriverManager.placementDriver().getPrimaryReplica( + new ZonePartitionId(zoneId, 0), + node.hybridClock.now() + ); + + assertThat(primaryReplicaFuture, willCompleteSuccessfully()); + + ReplicaMeta replicaMeta = primaryReplicaFuture.join(); + assertThat(replicaMeta, is(notNullValue())); + + return replicaMeta; + } + + private Node findAnyOtherNode(ReplicaMeta primaryReplica) { + return cluster.stream() + .filter(node -> !node.name.equals(primaryReplica.getLeaseholder())) + .findAny() + .orElseThrow(); + } +} diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java index 33345c5635d..d4930eb0c48 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java @@ -70,7 +70,6 @@ public class TxRecoveryEngine { false, Map.of( replicationGroupId, - // Enlistment consistency token is not required for the rollback, so it is 0L. abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId)) ), txId diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java index b598cdb5f20..39826070580 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.network.ClusterNodeResolver; import org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.ReplicaSafeTimeSyncRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler; +import org.apache.ignite.internal.partition.replicator.handlers.TxRecoveryMessageHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.VacuumTxStateReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.WriteIntentSwitchRequestHandler; @@ -49,6 +50,7 @@ import org.apache.ignite.internal.schema.SchemaSyncService; import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; +import org.apache.ignite.internal.tx.message.TxRecoveryMessage; import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest; import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest; import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; @@ -78,9 +80,10 @@ public class ZonePartitionReplicaListener implements ReplicaListener { // Replica request handlers. private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler; private final WriteIntentSwitchRequestHandler writeIntentSwitchRequestHandler; + private final TxStateCommitPartitionReplicaRequestHandler txStateCommitPartitionReplicaRequestHandler; + private final TxRecoveryMessageHandler txRecoveryMessageHandler; private final MinimumActiveTxTimeReplicaRequestHandler minimumActiveTxTimeReplicaRequestHandler; private final VacuumTxStateReplicaRequestHandler vacuumTxStateReplicaRequestHandler; - private final TxStateCommitPartitionReplicaRequestHandler txStateCommitPartitionReplicaRequestHandler; private final ReplicaSafeTimeSyncRequestHandler replicaSafeTimeSyncRequestHandler; /** @@ -116,6 +119,13 @@ public class ZonePartitionReplicaListener implements ReplicaListener { this.raftCommandApplicator = new ReplicationRaftCommandApplicator(raftClient, replicationGroupId); + TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine( + txManager, + clusterNodeResolver, + replicationGroupId, + ZonePartitionReplicaListener::createAbandonedTxRecoveryEnlistment + ); + // Request handlers initialization. txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler( @@ -138,26 +148,23 @@ public class ZonePartitionReplicaListener implements ReplicaListener { replicationGroupId ); - minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( - clockService, - raftCommandApplicator - ); - - vacuumTxStateReplicaRequestHandler = new VacuumTxStateReplicaRequestHandler(raftCommandApplicator); - txStateCommitPartitionReplicaRequestHandler = new TxStateCommitPartitionReplicaRequestHandler( txStatePartitionStorage, txManager, clusterNodeResolver, localNode, - new TxRecoveryEngine( - txManager, - clusterNodeResolver, - replicationGroupId, - ZonePartitionReplicaListener::createAbandonedTxRecoveryEnlistment - ) + txRecoveryEngine ); + txRecoveryMessageHandler = new TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, txRecoveryEngine); + + minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( + clockService, + raftCommandApplicator + ); + + vacuumTxStateReplicaRequestHandler = new VacuumTxStateReplicaRequestHandler(raftCommandApplicator); + replicaSafeTimeSyncRequestHandler = new ReplicaSafeTimeSyncRequestHandler(clockService, raftCommandApplicator); } @@ -198,6 +205,8 @@ public class ZonePartitionReplicaListener implements ReplicaListener { return writeIntentSwitchRequestHandler.handle((WriteIntentSwitchReplicaRequest) request, senderId); } else if (request instanceof TxStateCommitPartitionRequest) { return txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest) request); + } else if (request instanceof TxRecoveryMessage) { + return txRecoveryMessageHandler.handle((TxRecoveryMessage) request, senderId); } return processZoneReplicaRequest(request, replicaPrimacy, senderId); @@ -243,7 +252,7 @@ public class ZonePartitionReplicaListener implements ReplicaListener { } else if (request instanceof UpdateMinimumActiveTxBeginTimeReplicaRequest) { return minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest) request); } else if (request instanceof ReplicaSafeTimeSyncRequest) { - return replicaSafeTimeSyncRequestHandler.handle((ReplicaSafeTimeSyncRequest) request); + return replicaSafeTimeSyncRequestHandler.handle((ReplicaSafeTimeSyncRequest) request, replicaPrimacy.isPrimary()); } else { LOG.warn("Non table request is not supported by the zone partition yet " + request); } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/ReplicaSafeTimeSyncRequestHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/ReplicaSafeTimeSyncRequestHandler.java index 3a58c4a4557..f4ac6aee9bf 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/ReplicaSafeTimeSyncRequestHandler.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/ReplicaSafeTimeSyncRequestHandler.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.partition.replicator.handlers; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator; @@ -54,9 +56,14 @@ public class ReplicaSafeTimeSyncRequestHandler { * Handles {@link ReplicaSafeTimeSyncRequest}. * * @param request Request to handle. + * @param isPrimary Whether current node is a primary replica. * @return Future that will be completed when the request is handled. */ - public CompletableFuture<?> handle(ReplicaSafeTimeSyncRequest request) { + public CompletableFuture<?> handle(ReplicaSafeTimeSyncRequest request, boolean isPrimary) { + if (!isPrimary) { + return nullCompletedFuture(); + } + return commandApplicator.applyCommandWithExceptionHandling( REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().initiatorTime(clockService.now()).build() ); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java new file mode 100644 index 00000000000..b69b276d7ea --- /dev/null +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.handlers; + +import static org.apache.ignite.internal.tx.TxState.isFinalState; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.tx.TxMeta; +import org.apache.ignite.internal.tx.message.TxRecoveryMessage; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; + +/** + * Handler for processing {@link TxRecoveryMessage}s. + */ +public class TxRecoveryMessageHandler { + private static final IgniteLogger LOG = Loggers.forClass(TxRecoveryMessageHandler.class); + + private final TxStatePartitionStorage txStatePartitionStorage; + private final ReplicationGroupId replicationGroupId; + private final TxRecoveryEngine txRecoveryEngine; + + /** Constructor. */ + public TxRecoveryMessageHandler( + TxStatePartitionStorage txStatePartitionStorage, + ReplicationGroupId replicationGroupId, + TxRecoveryEngine txRecoveryEngine + ) { + this.txStatePartitionStorage = txStatePartitionStorage; + this.replicationGroupId = replicationGroupId; + this.txRecoveryEngine = txRecoveryEngine; + } + + /** + * Processes transaction recovery request on a commit partition. + * + * @param request Tx recovery request. + * @return The future is complete when the transaction state is finalized. + */ + public CompletableFuture<Void> handle(TxRecoveryMessage request, UUID senderId) { + UUID txId = request.txId(); + + TxMeta txMeta = txStatePartitionStorage.get(txId); + + // Check whether a transaction has already been finished. + if (txMeta != null && isFinalState(txMeta.txState())) { + // Tx recovery message is processed on the commit partition. + return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId, senderId); + } + + LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", txId, txMeta); + + return txRecoveryEngine.triggerTxRecovery(txId, senderId); + } +} diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/VacuumTxStateReplicaRequestHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/VacuumTxStateReplicaRequestHandler.java index 1fd44869045..c7f37d66650 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/VacuumTxStateReplicaRequestHandler.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/VacuumTxStateReplicaRequestHandler.java @@ -53,7 +53,6 @@ public class VacuumTxStateReplicaRequestHandler { .txIds(request.transactionIds()) .build(); - return commandApplicator.applyCommand(cmd); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 455ca521444..869b6323462 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -112,6 +112,7 @@ import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApp import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine; import org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler; +import org.apache.ignite.internal.partition.replicator.handlers.TxRecoveryMessageHandler; import org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.handlers.VacuumTxStateReplicaRequestHandler; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; @@ -356,13 +357,13 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr private final ReliableCatalogVersions reliableCatalogVersions; private final ReplicationRaftCommandApplicator raftCommandApplicator; private final ReplicaTxFinishMarker replicaTxFinishMarker; - private final TxRecoveryEngine txRecoveryEngine; // Replica request handlers. private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler; + private final TxStateCommitPartitionReplicaRequestHandler txStateCommitPartitionReplicaRequestHandler; + private final TxRecoveryMessageHandler txRecoveryMessageHandler; private final MinimumActiveTxTimeReplicaRequestHandler minimumActiveTxTimeReplicaRequestHandler; private final VacuumTxStateReplicaRequestHandler vacuumTxStateReplicaRequestHandler; - private final TxStateCommitPartitionReplicaRequestHandler txStateCommitPartitionReplicaRequestHandler; private final BuildIndexReplicaRequestHandler buildIndexReplicaRequestHandler; /** @@ -445,7 +446,7 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService); raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId); replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager); - txRecoveryEngine = new TxRecoveryEngine( + TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine( txManager, clusterNodeResolver, replicationGroupId, @@ -463,18 +464,21 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr replicationGroupId ); - minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( - clockService, - raftCommandApplicator); - - vacuumTxStateReplicaRequestHandler = new VacuumTxStateReplicaRequestHandler(raftCommandApplicator); - txStateCommitPartitionReplicaRequestHandler = new TxStateCommitPartitionReplicaRequestHandler( txStatePartitionStorage, txManager, clusterNodeResolver, localNode, - txRecoveryEngine); + txRecoveryEngine + ); + + txRecoveryMessageHandler = new TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, txRecoveryEngine); + + minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( + clockService, + raftCommandApplicator); + + vacuumTxStateReplicaRequestHandler = new VacuumTxStateReplicaRequestHandler(raftCommandApplicator); buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler( indexMetaStorage, @@ -489,8 +493,8 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove. private PendingTxPartitionEnlistment createAbandonedTxRecoveryEnlistment(ClusterNode node) { - // Enlistment consistency token is not required for the rollback, so it is 0L. assert !enabledColocation() : "Unexpected method call within colocation enabled."; + // Enlistment consistency token is not required for the rollback, so it is 0L. // This method is not called in a colocation context, thus it's valid to cast replicationGroupId to TablePartitionId. return new PendingTxPartitionEnlistment(node.name(), 0L, ((TablePartitionId) replicationGroupId).tableId()); } @@ -599,7 +603,9 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr } if (request instanceof TxRecoveryMessage) { - return processTxRecoveryMessage((TxRecoveryMessage) request, senderId); + assert !enabledColocation() : "Unexpected method call within colocation enabled."; + + return txRecoveryMessageHandler.handle((TxRecoveryMessage) request, senderId); } if (request instanceof TxCleanupRecoveryRequest) { @@ -661,28 +667,6 @@ public class PartitionReplicaListener implements ReplicaListener, ReplicaTablePr return nullCompletedFuture(); } - /** - * Processes transaction recovery request on a commit partition. - * - * @param request Tx recovery request. - * @return The future is complete when the transaction state is finalized. - */ - private CompletableFuture<Void> processTxRecoveryMessage(TxRecoveryMessage request, UUID senderId) { - UUID txId = request.txId(); - - TxMeta txMeta = txStatePartitionStorage.get(txId); - - // Check whether a transaction has already been finished. - if (txMeta != null && isFinalState(txMeta.txState())) { - // Tx recovery message is processed on the commit partition. - return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId, senderId); - } - - LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", txId, txMeta); - - return txRecoveryEngine.triggerTxRecovery(txId, senderId); - } - private CompletableFuture<Void> processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest request) { TablePartitionId replicaGrpId = (TablePartitionId) request.groupId().asReplicationGroupId();