This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6f034933e63 IGNITE-28030 Fix massive exceptions "Failed to abort on
coordinator a transaction that lost its primary replica's volatile state" on
server nodes (#7691)
6f034933e63 is described below
commit 6f034933e6357c8f76661e836794d16df899ea63
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Mar 9 13:46:08 2026 +0200
IGNITE-28030 Fix massive exceptions "Failed to abort on coordinator a
transaction that lost its primary replica's volatile state" on server nodes
(#7691)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 12 --
.../rebalance/ItRebalanceDistributedTest.java | 1 +
.../internal/index/IndexBuildingManager.java | 7 +-
.../partition/replicator/fixtures/Node.java | 1 +
.../PartitionReplicaLifecycleManager.java | 18 +-
.../partition/replicator/TxRecoveryEngine.java | 93 -----------
.../replicator/ZonePartitionReplicaListener.java | 34 ++--
.../handlers/TxRecoveryMessageHandler.java | 18 +-
...xStateCommitPartitionReplicaRequestHandler.java | 36 ++--
.../PartitionReplicaLifecycleManagerTest.java | 1 +
.../runner/app/ItIgniteNodeRestartTest.java | 1 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
.../replicator/PartitionReplicaListener.java | 23 +--
.../distributed/TableManagerRecoveryTest.java | 1 +
.../replication/PartitionReplicaListenerTest.java | 6 +-
.../ZonePartitionReplicaListenerTest.java | 14 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 28 +++-
.../ignite/internal/table/TxAbstractTest.java | 25 +--
.../table/impl/DummyInternalTableImpl.java | 9 +-
...riteIntentResolutionWhenPrimaryExpiredTest.java | 26 ++-
.../org/apache/ignite/internal/tx/TxManager.java | 6 -
.../org/apache/ignite/internal/tx/TxState.java | 7 +-
.../org/apache/ignite/internal/tx/TxStateMeta.java | 22 ++-
.../ignite/internal/tx/TxStateMetaUnknown.java | 1 +
.../internal/tx/impl/PlacementDriverHelper.java | 14 ++
.../internal/tx/impl/TransactionInflights.java | 2 +-
.../internal/tx/impl/TransactionStateResolver.java | 113 ++++++++++---
.../internal/tx/impl/TxCleanupRequestSender.java | 23 ++-
.../ignite/internal/tx/impl/TxManagerImpl.java | 38 -----
.../ignite/internal/tx/impl/TxMessageSender.java | 2 +
.../ignite/internal/tx/impl/TxRecoveryEngine.java | 183 +++++++++++++++++++++
.../tx/message/TxStateCoordinatorRequest.java | 7 +
.../tx/impl/TransactionStateResolverTest.java | 12 +-
33 files changed, 516 insertions(+), 269 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 0895d2a54a3..e9834d33218 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.fakes;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.Collection;
@@ -39,7 +38,6 @@ import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.PartitionEnlistment;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
-import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
@@ -212,16 +210,6 @@ public class FakeTxManager implements TxManager {
return null;
}
- @Override
- public CompletableFuture<@Nullable TransactionMeta>
checkEnlistedPartitionsAndAbortIfNeeded(
- TxStateMeta txMeta,
- InternalTransaction tx,
- long currentEnlistmentConsistencyToken,
- ZonePartitionId senderGroupId
- ) {
- return completedFuture(stateMeta(tx.id()));
- }
-
@Override
public <T extends TxStateMeta> T updateTxMeta(UUID txId,
Function<TxStateMeta, TxStateMeta> updater) {
return null;
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index c9907b58303..a2f4a1d4537 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1549,6 +1549,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
+ threadPoolsManager.commonScheduler(),
clockService,
placementDriver,
schemaSyncService,
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
index 6c640ae740a..61f26216e67 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
@@ -54,7 +54,9 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.Lazy;
/**
* Component is responsible for building indexes and making them {@link
CatalogIndexStatus#AVAILABLE available}. Both in a running cluster
@@ -120,7 +122,10 @@ public class IndexBuildingManager implements
IgniteComponent {
clusterService.topologyService(),
clusterService.messagingService(),
new ExecutorInclinedPlacementDriver(placementDriver, executor),
- new TxMessageSender(clusterService.messagingService(),
replicaService, clockService)
+ new TxMessageSender(clusterService.messagingService(),
replicaService, clockService),
+ new TxRecoveryEngine(txManager,
clusterService.topologyService()),
+ new Lazy<>(() ->
clusterService.topologyService().localMember()),
+ executor
);
indexBuilder = new IndexBuilder(
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index e5293fd5b42..9339fd161a3 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -742,6 +742,7 @@ public class Node {
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
+ threadPoolsManager.commonScheduler(),
clockService,
placementDriverManager.placementDriver(),
schemaSyncService,
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index e66bdea7289..d4e366fb5f0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -180,12 +180,14 @@ import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import
org.apache.ignite.internal.tx.storage.state.TxStateStorageRebalanceException;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.LongPriorityQueue;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
@@ -287,6 +289,8 @@ public class PartitionReplicaLifecycleManager extends
private final TxMessageSender txMessageSender;
+ private final TxRecoveryEngine txRecoveryEngine;
+
private final EventListener<CreateZoneEventParameters>
onCreateZoneListener = this::onCreateZone;
private final EventListener<PrimaryReplicaEventParameters>
onPrimaryReplicaExpiredListener = this::onPrimaryReplicaExpired;
private final EventListener<DropZoneEventParameters> onZoneDropListener =
fromConsumer(this::onZoneDrop);
@@ -312,6 +316,7 @@ public class PartitionReplicaLifecycleManager extends
* @param rebalanceScheduler Executor for scheduling rebalance routine.
* @param partitionOperationsExecutor Striped executor on which partition
operations (potentially requiring I/O with storages)
* will be executed.
+ * @param commonExecutor Common executor.
* @param clockService Clock service.
* @param placementDriver Placement driver.
* @param schemaSyncService Schema synchronization service.
@@ -336,6 +341,7 @@ public class PartitionReplicaLifecycleManager extends
ExecutorService ioExecutor,
ScheduledExecutorService rebalanceScheduler,
Executor partitionOperationsExecutor,
+ Executor commonExecutor,
ClockService clockService,
PlacementDriver placementDriver,
SchemaSyncService schemaSyncService,
@@ -360,6 +366,7 @@ public class PartitionReplicaLifecycleManager extends
ioExecutor,
rebalanceScheduler,
partitionOperationsExecutor,
+ commonExecutor,
clockService,
placementDriver,
schemaSyncService,
@@ -395,6 +402,7 @@ public class PartitionReplicaLifecycleManager extends
ExecutorService ioExecutor,
ScheduledExecutorService rebalanceScheduler,
Executor partitionOperationsExecutor,
+ Executor commonExecutor,
ClockService clockService,
PlacementDriver placementDriver,
SchemaSyncService schemaSyncService,
@@ -434,6 +442,8 @@ public class PartitionReplicaLifecycleManager extends
Integer::parseInt
);
+ txRecoveryEngine = new TxRecoveryEngine(txManager, topologyService);
+
txMessageSender = new TxMessageSender(
messagingService,
replicaService,
@@ -446,7 +456,10 @@ public class PartitionReplicaLifecycleManager extends
topologyService,
messagingService,
executorInclinedPlacementDriver,
- txMessageSender
+ txMessageSender,
+ txRecoveryEngine,
+ new Lazy<>(topologyService::localMember),
+ commonExecutor
);
pendingAssignmentsRebalanceListener =
createPendingAssignmentsRebalanceListener();
@@ -838,7 +851,8 @@ public class PartitionReplicaLifecycleManager extends
topologyService.localMember(),
zonePartitionId,
transactionStateResolver,
- txMessageSender
+ txMessageSender,
+ txRecoveryEngine
);
zoneResources.replicaListenerFuture().complete(replicaListener);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
deleted file mode 100644
index 87706ef4cc1..00000000000
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.partition.replicator;
-
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import org.apache.ignite.internal.hlc.HybridTimestampTracker;
-import org.apache.ignite.internal.network.ClusterNodeResolver;
-import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
-import org.apache.ignite.internal.tx.TxManager;
-
-/**
- * Transaction recovery logic.
- */
-public class TxRecoveryEngine {
- private final TxManager txManager;
- private final ClusterNodeResolver clusterNodeResolver;
-
- private final ZonePartitionId replicationGroupId;
- private final Function<InternalClusterNode, PendingTxPartitionEnlistment>
abandonedTxRecoveryEnlistmentFactory;
-
- /** Constructor. */
- public TxRecoveryEngine(
- TxManager txManager,
- ClusterNodeResolver clusterNodeResolver,
- ZonePartitionId replicationGroupId,
- Function<InternalClusterNode, PendingTxPartitionEnlistment>
abandonedTxRecoveryEnlistmentFactory
- ) {
- this.txManager = txManager;
- this.clusterNodeResolver = clusterNodeResolver;
- this.replicationGroupId = replicationGroupId;
- this.abandonedTxRecoveryEnlistmentFactory =
abandonedTxRecoveryEnlistmentFactory;
- }
-
- /**
- * Abort the abandoned transaction.
- *
- * @param txId Transaction id.
- * @param senderId Sender inconsistent id.
- */
- public CompletableFuture<Void> triggerTxRecovery(UUID txId, UUID senderId)
{
- // If the transaction state is pending, then the transaction should be
rolled back,
- // meaning that the state is changed to aborted and a corresponding
cleanup request
- // is sent in a common durable manner to a partition that has
initiated recovery.
- return txManager.finish(
- HybridTimestampTracker.emptyTracker(),
- // Tx recovery is executed on the commit partition.
- replicationGroupId,
- false,
- false,
- true,
- false,
- Map.of(replicationGroupId,
abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId))),
- txId
- )
- .whenComplete((v, ex) -> runCleanupOnNode(replicationGroupId,
txId, senderId));
- }
-
- /**
- * Run cleanup on a node.
- *
- * @param commitPartitionId Commit partition id.
- * @param txId Transaction id.
- * @param nodeId Node id (inconsistent).
- */
- public CompletableFuture<Void> runCleanupOnNode(ZonePartitionId
commitPartitionId, UUID txId, UUID nodeId) {
- // Get node id of the sender to send back cleanup requests.
- String nodeConsistentId =
clusterNodeResolver.getConsistentIdById(nodeId);
-
- return nodeConsistentId == null ? nullCompletedFuture() :
txManager.cleanup(commitPartitionId, nodeConsistentId, txId);
- }
-}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 845a974e472..39fc89a5fde 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -54,10 +54,10 @@ import
org.apache.ignite.internal.replicator.message.ReplicaRequest;
import
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.TableAware;
import org.apache.ignite.internal.schema.SchemaSyncService;
-import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
@@ -120,7 +120,8 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
InternalClusterNode localNode,
ZonePartitionId replicationGroupId,
TransactionStateResolver transactionStateResolver,
- TxMessageSender txMessageSender
+ TxMessageSender txMessageSender,
+ TxRecoveryEngine txRecoveryEngine
) {
this.raftClient = raftClient;
this.failureProcessor = failureProcessor;
@@ -143,13 +144,6 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
ReplicationRaftCommandApplicator raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftClient, replicationGroupId);
- TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(
- txManager,
- clusterNodeResolver,
- replicationGroupId,
-
ZonePartitionReplicaListener::createAbandonedTxRecoveryEnlistment
- );
-
// Request handlers initialization.
txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler(
@@ -177,12 +171,19 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
txStatePartitionStorage,
txManager,
clusterNodeResolver,
- localNode,
txRecoveryEngine,
- txMessageSender
+ txMessageSender,
+ replicationGroupId,
+ localNode
);
- txRecoveryMessageHandler = new
TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId,
txRecoveryEngine, txManager);
+ txRecoveryMessageHandler = new TxRecoveryMessageHandler(
+ txStatePartitionStorage,
+ replicationGroupId,
+ txRecoveryEngine,
+ txManager,
+ localNode
+ );
txCleanupRecoveryRequestHandler = new TxCleanupRecoveryRequestHandler(
txStatePartitionStorage,
@@ -201,13 +202,6 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
replicaSafeTimeSyncRequestHandler = new
ReplicaSafeTimeSyncRequestHandler(clockService, raftCommandApplicator);
}
- private static PendingTxPartitionEnlistment
createAbandonedTxRecoveryEnlistment(InternalClusterNode node) {
- // Enlistment consistency token is not required for the rollback, so
it is 0L.
- // Passing an empty set of table IDs as we don't know which tables
were enlisted; this is ok as the corresponding write intents
- // can still be resolved later when reads stumble upon them.
- return new PendingTxPartitionEnlistment(node.name(), 0L);
- }
-
@Override
public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request,
UUID senderId) {
return replicaPrimacyEngine.validatePrimacy(request)
@@ -237,7 +231,7 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
} else if (request instanceof WriteIntentSwitchReplicaRequest) {
return
writeIntentSwitchRequestHandler.handle((WriteIntentSwitchReplicaRequest)
request, senderId);
} else if (request instanceof TxStateCommitPartitionRequest) {
- return
txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest)
request);
+ return
txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest)
request, senderId);
} else if (request instanceof TxRecoveryMessage) {
return txRecoveryMessageHandler.handle((TxRecoveryMessage)
request, senderId);
} else if (request instanceof TxCleanupRecoveryRequest) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
index 68662c72f32..fce8c788686 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java
@@ -23,11 +23,12 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
+import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.TransactionLogUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
@@ -41,18 +42,21 @@ public class TxRecoveryMessageHandler {
private final ZonePartitionId replicationGroupId;
private final TxRecoveryEngine txRecoveryEngine;
private final TxManager txManager;
+ private final InternalClusterNode localNode;
/** Constructor. */
public TxRecoveryMessageHandler(
TxStatePartitionStorage txStatePartitionStorage,
ZonePartitionId replicationGroupId,
TxRecoveryEngine txRecoveryEngine,
- TxManager txManager
+ TxManager txManager,
+ InternalClusterNode localNode
) {
this.txStatePartitionStorage = txStatePartitionStorage;
this.replicationGroupId = replicationGroupId;
this.txRecoveryEngine = txRecoveryEngine;
this.txManager = txManager;
+ this.localNode = localNode;
}
/**
@@ -61,7 +65,7 @@ public class TxRecoveryMessageHandler {
* @param request Tx recovery request.
* @return The future is complete when the transaction state is finalized.
*/
- public CompletableFuture<Void> handle(TxRecoveryMessage request, UUID
senderId) {
+ public CompletableFuture<?> handle(TxRecoveryMessage request, UUID
senderId) {
UUID txId = request.txId();
TxMeta txMeta = txStatePartitionStorage.get(txId);
@@ -78,6 +82,12 @@ public class TxRecoveryMessageHandler {
txMeta
);
- return txRecoveryEngine.triggerTxRecovery(txId, senderId);
+ return txRecoveryEngine.triggerTxRecovery(
+ txId,
+ replicationGroupId,
+ localNode.name(),
+ null,
+ null
+ );
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
index f64c86af78e..4c9284aa457 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.RecipientLeftException;
-import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
@@ -38,6 +37,7 @@ import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.TxStateMetaFinishing;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.jetbrains.annotations.Nullable;
@@ -50,36 +50,41 @@ public class TxStateCommitPartitionReplicaRequestHandler {
private final TxManager txManager;
private final ClusterNodeResolver clusterNodeResolver;
- private final InternalClusterNode localNode;
-
private final TxRecoveryEngine txRecoveryEngine;
private final TxMessageSender txMessageSender;
+ private final ZonePartitionId groupId;
+
+ private final InternalClusterNode localNode;
+
/** Constructor. */
public TxStateCommitPartitionReplicaRequestHandler(
TxStatePartitionStorage txStatePartitionStorage,
TxManager txManager,
ClusterNodeResolver clusterNodeResolver,
- InternalClusterNode localNode,
TxRecoveryEngine txRecoveryEngine,
- TxMessageSender txMessageSender
+ TxMessageSender txMessageSender,
+ ZonePartitionId groupId,
+ InternalClusterNode localNode
) {
this.txStatePartitionStorage = txStatePartitionStorage;
this.txManager = txManager;
this.clusterNodeResolver = clusterNodeResolver;
- this.localNode = localNode;
this.txRecoveryEngine = txRecoveryEngine;
this.txMessageSender = txMessageSender;
+ this.groupId = groupId;
+ this.localNode = localNode;
}
/**
* Handles a {@link TxStateCommitPartitionRequest}.
*
* @param request Transaction state request.
+ * @param senderId Sender ephemeral id.
* @return Result future.
*/
- public CompletableFuture<TransactionMeta>
handle(TxStateCommitPartitionRequest request) {
+ public CompletableFuture<TransactionMeta>
handle(TxStateCommitPartitionRequest request, UUID senderId) {
UUID txId = request.txId();
TxStateMeta txMeta = txManager.stateMeta(txId);
@@ -98,7 +103,8 @@ public class TxStateCommitPartitionReplicaRequestHandler {
txMeta,
request.readTimestamp(),
request.senderCurrentConsistencyToken(),
- zonePartitionId
+ zonePartitionId,
+ senderId
);
} else {
return completedFuture(txMeta);
@@ -121,7 +127,8 @@ public class TxStateCommitPartitionReplicaRequestHandler {
@Nullable TxStateMeta txStateMeta,
@Nullable HybridTimestamp readTimestamp,
@Nullable Long senderCurrentConsistencyToken,
- @Nullable ZonePartitionId senderGroupId
+ @Nullable ZonePartitionId senderGroupId,
+ UUID senderId
) {
// The state is either null or PENDING or ABANDONED, other states have
been filtered out previously.
assert txStateMeta == null || txStateMeta.txState() == PENDING ||
txStateMeta.txState() == ABANDONED
@@ -143,10 +150,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
// state; and there is no final tx state in txStateStorage, or
the tx coordinator left the cluster. But we can assume
// that as the coordinator (or information about it) is
missing, there is no need to wait a finish request from
// tx coordinator, the transaction can't be committed at all.
- return txRecoveryEngine.triggerTxRecovery(txId, localNode.id())
- .handle((v, ex) ->
-
CompletableFuture.<TransactionMeta>completedFuture(txManager.stateMeta(txId)))
- .thenCompose(Function.identity());
+ return txRecoveryEngine.triggerTxRecovery(txId, groupId,
localNode.name(), senderGroupId, senderId);
} else if (coordinator != null) {
// If there is coordinator in the cluster we should fallback
to coordinator request. It's possible that coordinator
// was not seen in topology on another node which requested
the state from commit partition, but can be seen here.
@@ -154,6 +158,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
return txMessageSender.resolveTxStateFromCoordinator(
coordinator,
+ groupId,
txId,
timestamp,
senderCurrentConsistencyToken,
@@ -177,10 +182,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
markAbandoned(txId);
}
- return
txRecoveryEngine.triggerTxRecovery(txId, localNode.id())
- .handle((v, ex) ->
-
CompletableFuture.<TransactionMeta>completedFuture(txManager.stateMeta(txId)))
- .thenCompose(Function.identity());
+ return
txRecoveryEngine.triggerTxRecovery(txId, groupId, localNode.name(),
senderGroupId, senderId);
}
})
.thenCompose(Function.identity());
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index b229f25c47c..ff9e319a599 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -295,6 +295,7 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
executorService,
scheduledExecutorService,
executorService,
+ Runnable::run,
clockService,
placementDriver,
schemaSyncService,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 765c25f61ac..d73fc0a2448 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -773,6 +773,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
+ threadPoolsManager.commonScheduler(),
clockService,
placementDriverManager.placementDriver(),
schemaSyncService,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index bf1402cf11c..8cef74418ee 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1117,6 +1117,7 @@ public class IgniteImpl implements Ignite {
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
+ threadPoolsManager.commonScheduler(),
clockService,
placementDriverMgr.placementDriver(),
schemaSyncService,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 62e3d004276..a13431c93f1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -464,11 +464,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
// Saving state is not needed for full transactions.
if (!req.full()) {
- txManager.updateTxMeta(req.transactionId(), old ->
builder(old, PENDING)
- .txCoordinatorId(req.coordinatorId())
-
.commitPartitionId(req.commitPartitionId().asZonePartitionId())
- .txLabel(req.txLabel())
- .build());
+ replicaTouch(req.transactionId(), req.coordinatorId(),
req.commitPartitionId().asZonePartitionId(), req.txLabel());
}
}
@@ -486,6 +482,14 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return completedFuture(mvDataStorage.estimatedSize());
}
+ private void replicaTouch(UUID txId, UUID coordinatorId, ZonePartitionId
commitPartitionId, @Nullable String txLabel) {
+ txManager.updateTxMeta(txId, old -> builder(old, PENDING)
+ .txCoordinatorId(coordinatorId)
+ .commitPartitionId(commitPartitionId)
+ .txLabel(txLabel)
+ .build());
+ }
+
private static void setDelayedAckProcessor(@Nullable ReplicaResult result,
@Nullable BiConsumer<Object, Throwable> proc) {
if (result != null) {
result.delayedAckProcessor = proc;
@@ -563,11 +567,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
// We treat SCAN as 2pc and only switch to a 1pc mode if all table
rows fit in the bucket and the transaction is implicit.
// See `req.full() && (err != null || rows.size() <
req.batchSize())` condition.
// If they don't fit the bucket, the transaction is treated as 2pc.
- txManager.updateTxMeta(req.transactionId(), old -> builder(old,
PENDING)
- .txCoordinatorId(req.coordinatorId())
-
.commitPartitionId(req.commitPartitionId().asZonePartitionId())
- .txLabel(req.txLabel())
- .build());
+ replicaTouch(req.transactionId(), req.coordinatorId(),
req.commitPartitionId().asZonePartitionId(), req.txLabel());
// Implicit RW scan can be committed locally on a last batch or
error.
return appendTxCommand(req.transactionId(), RW_SCAN, false, () ->
processScanRetrieveBatchAction(req))
@@ -3497,7 +3497,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
if (isFinalState(transactionMeta.txState())) {
scheduleAsyncWriteIntentSwitch(txId,
writeIntent.rowId(), transactionMeta);
- } else {
+ } else if (timestamp == null) {
+ // If it's resolution by RW txn.
LOG.info(
"Received non-final transaction state after tx
state resolution [txId={}, groupId={}, txMeta={}, "
+ "timestamp={}, commitPartId={},
currentConsistencyToken={}, writeIntentReadable={}].",
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index c5f6b362428..a1c60506c1b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -545,6 +545,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
ForkJoinPool.commonPool(),
mock(ScheduledExecutorService.class),
partitionOperationsExecutor,
+ ForkJoinPool.commonPool(),
clockService,
placementDriver,
schemaSyncService,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index d96a5d45554..f639261c707 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -239,6 +239,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
@@ -646,7 +647,10 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
messagingService,
mock(ReplicaService.class),
clockService
- )
+ ),
+ new TxRecoveryEngine(txManager,
mock(ClusterNodeResolver.class)),
+ new Lazy<>(() -> mock(InternalClusterNode.class)),
+ Runnable::run
);
transactionStateResolver.start();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index ecc3139b0ce..f8b3cb96802 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -206,6 +206,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
import org.apache.ignite.internal.tx.message.PartitionEnlistmentMessage;
import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
@@ -614,7 +615,10 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
messagingService,
mock(ReplicaService.class),
clockService
- )
+ ),
+ new TxRecoveryEngine(txManager,
mock(ClusterNodeResolver.class)),
+ new Lazy<>(() -> mock(InternalClusterNode.class)),
+ Runnable::run
);
transactionStateResolver.start();
@@ -633,6 +637,11 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
clockService
);
+ var txRecoveryEngine = new TxRecoveryEngine(
+ txManager,
+ topologySrv
+ );
+
zonePartitionReplicaListener = new ZonePartitionReplicaListener(
txStateStorage,
clockService,
@@ -647,7 +656,8 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
localNode,
zonePartitionId,
transactionStateResolver,
- txMessageSender
+ txMessageSender,
+ txRecoveryEngine
);
tableReplicaProcessor = new PartitionReplicaListener(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index f35084f8c62..816dd602018 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -179,6 +179,7 @@ import
org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
@@ -733,13 +734,21 @@ public class ItTxTestCluster {
clockServices.get(assignment)
);
+ TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(
+ txManagers.get(assignment),
+ clusterServices.get(assignment).topologyService()
+ );
+
var transactionStateResolver = new TransactionStateResolver(
txManagers.get(assignment),
clockServices.get(assignment),
nodeResolver,
clusterServices.get(assignment).messagingService(),
placementDriver,
- txMessageSender
+ txMessageSender,
+ txRecoveryEngine,
+ new Lazy<>(() -> mock(InternalClusterNode.class)),
+ Runnable::run
);
transactionStateResolver.start();
@@ -967,6 +976,11 @@ public class ItTxTestCluster {
clockService
);
+ var txRecoveryEngine = new TxRecoveryEngine(
+ txManager,
+ clusterServices.get(assignment).topologyService()
+ );
+
ZonePartitionReplicaListener zonePartitionReplicaListener =
nodeSpecificZonePartitionReplicaListeners.computeIfAbsent(
zonePartitionId,
partitionId -> new ZonePartitionReplicaListener(
@@ -983,7 +997,8 @@ public class ItTxTestCluster {
localNode,
partitionId,
transactionStateResolver,
- txMessageSender
+ txMessageSender,
+ txRecoveryEngine
)
);
@@ -1158,6 +1173,8 @@ public class ItTxTestCluster {
* Shutdowns all cluster nodes after each test.
*/
public void shutdownCluster() {
+ LOG.info("Cluster shutdown begin");
+
assertThat(stopAsync(new ComponentContext(), cluster),
willCompleteSuccessfully());
assertThat(stopAsync(new ComponentContext(), client),
willCompleteSuccessfully());
@@ -1233,6 +1250,8 @@ public class ItTxTestCluster {
if (partitionOperationsExecutor != null) {
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10,
TimeUnit.SECONDS);
}
+
+ LOG.info("Cluster shutdown end");
}
/**
@@ -1327,7 +1346,10 @@ public class ItTxTestCluster {
client.messagingService(),
clientReplicaSvc,
clientClockService
- )
+ ),
+ new TxRecoveryEngine(clientTxManager,
client.topologyService()),
+ new Lazy<>(() -> mock(InternalClusterNode.class)),
+ Runnable::run
);
clientTxStateResolver.start();
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index a8eda99eb62..9b6ebdbbf2d 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.table;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -56,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
@@ -554,13 +555,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
new TransactionOptions().timeoutMillis(1000)
);
- var err = assertThrows(CompletionException.class, fut0::join);
-
- try {
- assertInstanceOf(IllegalArgumentException.class, err.getCause());
- } catch (AssertionError e) {
- throw new AssertionError("Unexpected exception type", err);
- }
+ assertThat(fut0, willThrow(IllegalArgumentException.class));
assertEquals(balance, view.get(null,
makeKey(1)).doubleValue("balance"));
}
@@ -582,13 +577,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
new TransactionOptions().timeoutMillis(1000)
);
- var err = assertThrows(CompletionException.class, fut0::join);
-
- try {
- assertInstanceOf(NullPointerException.class, err.getCause());
- } catch (AssertionError e) {
- throw new AssertionError("Unexpected exception type", err);
- }
+ assertThat(fut0, willThrow(NullPointerException.class));
}
@Test
@@ -1921,7 +1910,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
CompletableFuture<List<Tuple>> roBeforeCommitTxFut =
scan(accounts.internalTable(), readOnlyTx);
- var roBeforeCommitTxRows = roBeforeCommitTxFut.get(10,
TimeUnit.SECONDS);
+ var roBeforeCommitTxRows = roBeforeCommitTxFut.get(10, SECONDS);
assertEquals(2, roBeforeCommitTxRows.size());
@@ -1940,7 +1929,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
// Same read-only transaction.
roBeforeCommitTxFut = scan(accounts.internalTable(), readOnlyTx);
- roBeforeCommitTxRows = roBeforeCommitTxFut.get(10, TimeUnit.SECONDS);
+ roBeforeCommitTxRows = roBeforeCommitTxFut.get(10, SECONDS);
assertEquals(2, roBeforeCommitTxRows.size());
@@ -1958,7 +1947,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
CompletableFuture<List<Tuple>> roAfterCommitTxFut =
scan(accounts.internalTable(), readOnlyTx2);
- var roAfterCommitTxRows = roAfterCommitTxFut.get(10, TimeUnit.SECONDS);
+ var roAfterCommitTxRows = roAfterCommitTxFut.get(10, SECONDS);
assertEquals(1, roAfterCommitTxRows.size());
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 8e4a2b512d6..dfe814ccef1 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -139,6 +139,7 @@ import
org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
@@ -509,6 +510,11 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
CLOCK_SERVICE
);
+ var txRecoveryEngine = new TxRecoveryEngine(
+ txManager,
+ mock(ClusterNodeResolver.class)
+ );
+
ZonePartitionReplicaListener zoneReplicaListener = new
ZonePartitionReplicaListener(
txStateStorage.getOrCreatePartitionStorage(PART_ID),
CLOCK_SERVICE,
@@ -523,7 +529,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
LOCAL_NODE,
zonePartitionId,
transactionStateResolver,
- txMessageSender
+ txMessageSender,
+ txRecoveryEngine
);
zoneReplicaListener.addTableReplicaProcessor(tableId, (raftClient,
txStateResolver) -> tableReplicaListener);
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest.java
index a05243ff932..48f3f569a4a 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -73,7 +74,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
/**
* Test for transaction abort on coordinator when write-intent resolution
happens after primary replica expiration.
@@ -119,8 +120,13 @@ public class
ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testCoordinatorAbortsTransaction(boolean withThinClient)
throws Exception {
+ @CsvSource({
+ "true, true",
+ "true, false",
+ "false, true",
+ "false, false"
+ })
+ public void testCoordinatorAbortsTransaction(boolean withThinClient,
boolean limitedDataOnCoordinator) throws Exception {
IgniteClient client = IgniteClient.builder()
.addresses(getClientAddresses(runningNodes().collect(toList())).toArray(new
String[0]))
.operationTimeout(15_000)
@@ -188,6 +194,11 @@ public class
ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest
waitAndGetPrimaryReplica(coordinatorNode, groupId);
+ if (limitedDataOnCoordinator) {
+ coordinatorNode.txManager()
+ .updateTxMeta(clientTxId(tx0), old ->
TxStateMeta.builder(PENDING).txCoordinatorId(coordinatorNode.id()).build());
+ }
+
Transaction tx = coordinatorNode.transactions().begin();
log.info("Test: new tx: " + txId(tx));
log.info("Test: upsert");
@@ -335,4 +346,13 @@ public class
ItTxAbortOnCoordinatorOnWriteIntentResolutionWhenPrimaryExpiredTest
.map(ignite -> unwrapIgniteImpl(ignite).clientAddress().port())
.collect(toList());
}
+
+ private static UUID clientTxId(Transaction tx) {
+ if (tx instanceof ClientLazyTransaction) {
+ ClientLazyTransaction clientTx = (ClientLazyTransaction) tx;
+ return clientTx.startedTx().txId();
+ } else {
+ return txId(tx);
+ }
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 4d6ecf0f38a..578e60f04c9 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -135,12 +135,6 @@ public interface TxManager extends IgniteComponent {
*/
@Nullable TxStateMeta stateMeta(UUID txId);
- CompletableFuture<@Nullable TransactionMeta>
checkEnlistedPartitionsAndAbortIfNeeded(
- TxStateMeta txMeta,
- InternalTransaction tx,
- long currentEnlistmentConsistencyToken,
- ZonePartitionId senderGroupId);
-
/**
* Atomically changes the state meta of a transaction.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index 6ca2351d075..a446ebef1ea 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -57,9 +57,10 @@ public enum TxState {
/**
* Unknown transaction state. It may be used during transaction state
resolution, shouldn't be used for either volatile
- * or persistent transaction meta. It means that the state of the sought
transaction is unrecoverable from current storage state,
- * and it's possible if there is no corresponding committed version, so
the write intent that triggered this resolution
- * should be ignored. Any transitions from or to this state are forbidden.
+ * or persistent transaction meta. It means that the state of the sought
transaction is not found on current node
+ * or unrecoverable from current storage state, and it's possible if there
is no corresponding committed version,
+ * so the write intent that triggered this resolution should be ignored.
+ * Any transitions from or to this state are forbidden.
*/
UNKNOWN(5);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index ddff0619475..be229a32c23 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -340,13 +340,31 @@ public class TxStateMeta implements TransactionMeta {
return this;
}
+ /**
+ * Changes transaction label, if non-{@code null} value is provided.
+ *
+ * @param txLabel Transaction label.
+ * @return This.
+ */
public TxStateMetaBuilder txLabel(@Nullable String txLabel) {
- this.txLabel = txLabel;
+ if (txLabel != null) {
+ this.txLabel = txLabel;
+ }
+
return this;
}
+ /**
+ * Changes internal transaction, if non-{@code null} value is provided.
+ *
+ * @param tx Internal transaction.
+ * @return This.
+ */
public TxStateMetaBuilder tx(@Nullable InternalTransaction tx) {
- this.tx = tx;
+ if (tx != null) {
+ this.tx = tx;
+ }
+
return this;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
index e1baa73159e..6c04d6e9a2f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
@@ -44,6 +44,7 @@ public class TxStateMetaUnknown extends TxStateMeta {
public TxStateMetaMessage toTransactionMetaMessage(ReplicaMessagesFactory
replicaMessagesFactory, TxMessagesFactory txMessagesFactory) {
return txMessagesFactory
.txStateMetaUnknownMessage()
+ .txState(TxState.UNKNOWN)
.build();
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
index d9cc060254a..724912584a9 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
@@ -38,10 +38,13 @@ import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
/**
* A helper class to retrieve primary replicas with exception handling.
@@ -119,6 +122,17 @@ public class PlacementDriverHelper {
});
}
+ /**
+ * See {@link
LeasePlacementDriver#getCurrentPrimaryReplica(ReplicationGroupId,
HybridTimestamp)}. Gets primary replica for now.
+ *
+ * @param partitionId Partition id.
+ * @return Primary replica for the provided partition, or null if there is
no primary at the moment.
+ */
+ @Nullable
+ public ReplicaMeta getCurrentPrimaryReplica(ZonePartitionId partitionId) {
+ return placementDriver.getCurrentPrimaryReplica(partitionId,
clockService.current());
+ }
+
/**
* Get primary replicas for the provided partitions.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 83934e5773a..4170318b2e1 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -47,7 +47,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Contains counters for in-flight requests of the transactions. Read-write
transactions can't finish when some requests are in-flight.
- * Read-only transactions can't be included into {@link
org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage} when
+ * Read-only transactions can't be included into {@link
FinishedTransactionsBatchMessage} when
* some requests are in-flight.
*/
public class TransactionInflights {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
index 847d3b6d250..de483d3f9a8 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
@@ -18,21 +18,29 @@
package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.UNKNOWN;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static
org.apache.ignite.internal.tx.impl.PlacementDriverHelper.AWAIT_PRIMARY_REPLICA_TIMEOUT;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.IgniteThrottledLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.InternalClusterNode;
@@ -40,10 +48,10 @@ import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
@@ -54,6 +62,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
+import org.apache.ignite.internal.util.Lazy;
import org.jetbrains.annotations.Nullable;
/**
@@ -62,6 +71,8 @@ import org.jetbrains.annotations.Nullable;
public class TransactionStateResolver {
private static final IgniteLogger LOG =
Loggers.forClass(TransactionStateResolver.class);
+ private final IgniteThrottledLogger throttledLogger;
+
/** Tx messages factory. */
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
@@ -88,6 +99,10 @@ public class TransactionStateResolver {
*/
private final TxMessageSender txMessageSender;
+ private final TxRecoveryEngine txRecoveryEngine;
+
+ private final Lazy<InternalClusterNode> localNode;
+
/**
* The constructor.
*
@@ -96,6 +111,10 @@ public class TransactionStateResolver {
* @param clusterNodeResolver Cluster node resolver.
* @param messagingService Messaging service.
* @param placementDriver Placement driver.
+ * @param txMessageSender Tx message sender.
+ * @param txRecoveryEngine Transaction recovery engine.
+ * @param localNode Local cluster node.
+ * @param throttledLogExecutor Executor for cleaning up the throttled
logger's cache.
*/
public TransactionStateResolver(
TxManager txManager,
@@ -103,7 +122,10 @@ public class TransactionStateResolver {
ClusterNodeResolver clusterNodeResolver,
MessagingService messagingService,
PlacementDriver placementDriver,
- TxMessageSender txMessageSender
+ TxMessageSender txMessageSender,
+ TxRecoveryEngine txRecoveryEngine,
+ Lazy<InternalClusterNode> localNode,
+ Executor throttledLogExecutor
) {
this.txManager = txManager;
this.clockService = clockService;
@@ -111,6 +133,9 @@ public class TransactionStateResolver {
this.messagingService = messagingService;
this.placementDriverHelper = new
PlacementDriverHelper(placementDriver, clockService);
this.txMessageSender = txMessageSender;
+ this.txRecoveryEngine = txRecoveryEngine;
+ this.localNode = localNode;
+ this.throttledLogger = Loggers.toThrottledLogger(LOG,
throttledLogExecutor);
}
/**
@@ -121,8 +146,16 @@ public class TransactionStateResolver {
if (msg instanceof TxStateCoordinatorRequest) {
TxStateCoordinatorRequest req = (TxStateCoordinatorRequest)
msg;
- processTxStateRequest(req)
- .thenAccept(txStateMeta -> {
+ processTxStateRequest(req, sender)
+ .whenComplete((txStateMeta, e) -> {
+ if (e != null) {
+ Throwable cause = unwrapCause(e);
+ throttledLogger.info(cause.getMessage());
+
+ // Will cause fallback to commit partition
path.
+ txStateMeta =
TxStateMeta.builder(UNKNOWN).build();
+ }
+
NetworkMessage response =
TX_MESSAGES_FACTORY.txStateResponse()
.txStateMeta(toTransactionMetaMessage(txStateMeta))
.timestamp(clockService.now())
@@ -304,9 +337,16 @@ public class TransactionStateResolver {
resolveTxStateFromCommitPartition(txId, commitGrpId,
senderCurrentConsistencyToken, senderGroupId, txMetaFuture);
} else {
- txMessageSender.resolveTxStateFromCoordinator(coordinator, txId,
timestamp, senderCurrentConsistencyToken, senderGroupId)
+ txMessageSender.resolveTxStateFromCoordinator(
+ coordinator,
+ commitGrpId,
+ txId,
+ timestamp,
+ senderCurrentConsistencyToken,
+ senderGroupId
+ )
.whenComplete((response, e) -> {
- if (e == null && response.txStateMeta() != null) {
+ if (e == null && response.txStateMeta() != null &&
response.txStateMeta().txState() != UNKNOWN) {
txMetaFuture.complete(response.txStateMeta().asTransactionMeta());
} else {
if (e != null && e.getCause() instanceof
RecipientLeftException) {
@@ -440,9 +480,13 @@ public class TransactionStateResolver {
* {@link TxState#FINISHING}, it waits for actual completion instead.
*
* @param request Request.
+ * @param sender Sender node.
* @return Future that should be completed with transaction state meta.
*/
- private CompletableFuture<@Nullable TransactionMeta>
processTxStateRequest(TxStateCoordinatorRequest request) {
+ private CompletableFuture<@Nullable TransactionMeta> processTxStateRequest(
+ TxStateCoordinatorRequest request,
+ InternalClusterNode sender
+ ) {
clockService.updateClock(request.readTimestamp());
UUID txId = request.txId();
@@ -458,31 +502,58 @@ public class TransactionStateResolver {
TxStateMetaFinishing txStateMetaFinishing =
(TxStateMetaFinishing) txStateMeta;
return txStateMetaFinishing.txFinishFuture();
- } else {
- InternalTransaction tx = txStateMeta.tx();
+ } else if (request.readTimestamp() == null ||
request.readTimestamp().equals(HybridTimestamp.MIN_VALUE)) {
+ // If txn is in non-final state and resolution is requested by
RW txn.
Long currentConsistencyToken =
request.senderCurrentConsistencyToken();
ZonePartitionId groupId = request.senderGroupId() == null
? null
: request.senderGroupId().asZonePartitionId();
- if (tx != null && !tx.isReadOnly() && currentConsistencyToken
!= null && groupId != null) {
- return
txManager.checkEnlistedPartitionsAndAbortIfNeeded(txStateMeta, tx,
currentConsistencyToken, groupId);
+ if (currentConsistencyToken != null && groupId != null) {
+ ZonePartitionId commitPartitionId =
txStateMeta.commitPartitionId();
+
+ if (commitPartitionId == null) {
+ commitPartitionId =
request.commitPartitionId().asZonePartitionId();
+
+ if (commitPartitionId == null) {
+ return failedFuture(new IgniteInternalException(
+ INTERNAL_ERR,
+ format("Commit partition id is absent in
transaction meta "
+ + "on coordinator [txId={},
txMeta={}, req={}].", txId, txStateMeta)
+ ));
+ }
+ }
+
+ String commitPartitionNode =
commitPartitionNode(commitPartitionId);
+
+ return txRecoveryEngine.triggerTxRecovery(txId,
commitPartitionId, commitPartitionNode, groupId, sender.id());
} else {
- LOG.info("Failed to abort on coordinator a transaction
that lost its primary replica's volatile state "
- + "[txId={}, internalTx={}, readOnly={},
senderCurrentConsistencyToken={}, senderGroupId={}].",
- txId,
- tx,
- (tx == null ? null : tx.isReadOnly()),
- currentConsistencyToken,
- groupId
- );
+ return failedFuture(new IgniteInternalException(
+ INTERNAL_ERR,
+ format("Failed to abort on coordinator a
transaction that lost its primary replica's volatile state "
+ + "[txId={},
senderCurrentConsistencyToken={}, senderGroupId={}].",
+ txId,
+ currentConsistencyToken,
+ groupId
+ )
+ ));
}
+ } else {
+ return completedFuture(txStateMeta);
}
+ } else {
+ return failedFuture(
+ new IgniteInternalException(INTERNAL_ERR,
format("Transaction meta is absent on coordinator [txId={}].", txId))
+ );
}
+ }
- LOG.info("Transaction meta is absent on coordinator [txId={}].", txId);
+ private String commitPartitionNode(ZonePartitionId commitPartitionId) {
+ ReplicaMeta replicaMeta =
placementDriverHelper.getCurrentPrimaryReplica(commitPartitionId);
- return completedFuture(txStateMeta);
+ return replicaMeta == null
+ ? localNode.get().name() // Will be resolved correctly by tx
cleanup sender.
+ : replicaMeta.getLeaseholder();
}
private static @Nullable TransactionMetaMessage
toTransactionMetaMessage(@Nullable TransactionMeta transactionMeta) {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index a8e41b8e6d5..6256dd5d31f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -410,15 +410,20 @@ public class TxCleanupRequestSender {
if (partitions == null) {
// If we don't have any partition, which is
the recovery or "unlock only" case,
// just try again with the same node.
- return sendCleanupMessageWithRetries(
- commitPartitionId,
- commit,
- commitTimestamp,
- txId,
- node,
- partitions,
- incrementTimeout(timeout),
- attemptsMade + 1
+ return scheduleRetry(
+ () -> sendCleanupMessageWithRetries(
+ commitPartitionId,
+ commit,
+ commitTimestamp,
+ txId,
+ node,
+ partitions,
+ incrementTimeout(timeout),
+ attemptsMade + 1
+ ),
+ timeout,
+ TimeUnit.MILLISECONDS,
+ retryExecutor
);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 0bd927e95cc..7007fb31085 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.tx.impl;
import static java.lang.Math.toIntExact;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -583,43 +582,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
return txStateVolatileStorage.state(txId);
}
- @Override
- public CompletableFuture<@Nullable TransactionMeta>
checkEnlistedPartitionsAndAbortIfNeeded(
- TxStateMeta txMeta,
- InternalTransaction tx,
- long currentEnlistmentConsistencyToken,
- ZonePartitionId senderGroupId
- ) {
- PendingTxPartitionEnlistment enlistment =
tx.enlistedPartition(senderGroupId);
-
- // Enlistment for thin client direct request may be absent on
coordinator.
- if (enlistment == null || enlistment.consistencyToken() !=
currentEnlistmentConsistencyToken) {
- // Remote partition already has different consistency token, so we
can't commit this transaction anyway.
- // Even when graceful primary replica switch is done, we can get
here only if the write intent that requires resolution
- // is not under lock.
- // TODO https://issues.apache.org/jira/browse/IGNITE-27386 the
reason of rollback needs to be explained.
- return tx.rollbackAsync()
- .thenApply(unused -> {
- TxStateMeta newMeta = stateMeta(tx.id());
-
- assert isFinalState(newMeta.txState());
-
- return newMeta;
- });
- }
-
- LOG.info("Skipped aborting on coordinator a transaction that lost its
primary replica's volatile state "
- + "[txId={}, internalTx={}, enlistment={},
senderCurrentConsistencyToken={}, txMeta={}].",
- tx.id(),
- tx,
- enlistment,
- currentEnlistmentConsistencyToken,
- txMeta
- );
-
- return completedFuture(txMeta);
- }
-
@TestOnly
public Collection<TxStateMeta> states() {
return txStateVolatileStorage.states();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 12f4b93a6f9..b36af61a5b5 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -225,6 +225,7 @@ public class TxMessageSender {
*/
public CompletableFuture<TxStateResponse> resolveTxStateFromCoordinator(
InternalClusterNode coordinatorClusterNode,
+ ZonePartitionId commitGrpId,
UUID txId,
HybridTimestamp timestamp,
@Nullable Long senderCurrentConsistencyToken,
@@ -237,6 +238,7 @@ public class TxMessageSender {
.txId(txId)
.senderCurrentConsistencyToken(senderCurrentConsistencyToken)
.senderGroupId(toZonePartitionIdMessageNullable(REPLICA_MESSAGES_FACTORY,
senderGroupId))
+
.commitPartitionId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
commitGrpId))
.build(),
RPC_TIMEOUT_MILLIS)
.thenApply(resp -> {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
new file mode 100644
index 00000000000..a5f2ff0f362
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TransactionInternalException;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.TxStateMetaFinishing;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Transaction recovery logic.
+ */
+public class TxRecoveryEngine {
+ private final TxManager txManager;
+ private final ClusterNodeResolver clusterNodeResolver;
+
+ /** Constructor. */
+ public TxRecoveryEngine(
+ TxManager txManager,
+ ClusterNodeResolver clusterNodeResolver
+ ) {
+ this.txManager = txManager;
+ this.clusterNodeResolver = clusterNodeResolver;
+ }
+
+ /**
+ * Abort the abandoned transaction.
+ *
+ * @param txId Transaction id.
+ * @param commitPartitionId Commit partition id.
+ * @param commitPartitionNode Commit partition node consistent id.
+ * @param senderGroupId Sender group id.
+ * @param senderId Sender ephemeral id.
+ */
+ public CompletableFuture<TransactionMeta> triggerTxRecovery(
+ UUID txId,
+ ZonePartitionId commitPartitionId,
+ String commitPartitionNode,
+ @Nullable ZonePartitionId senderGroupId,
+ @Nullable UUID senderId
+ ) {
+ // Should not be empty.
+ Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroupsMap =
enlistedGroupsMap(
+ commitPartitionId,
+ commitPartitionNode,
+ senderGroupId,
+ senderId == null ? null : clusterNodeResolver.getById(senderId)
+ );
+
+ // If the transaction state is pending, then the transaction should be
rolled back,
+ // meaning that the state is changed to aborted and a corresponding
cleanup request
+ // is sent in a common durable manner to a partition that has
initiated recovery.
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27386 the reason
of rollback needs to be explained.
+ return txManager.finish(
+ HybridTimestampTracker.emptyTracker(),
+ // Tx recovery is executed on the commit partition.
+ commitPartitionId,
+ false,
+ false,
+ true,
+ false,
+ enlistedGroupsMap,
+ txId
+ )
+ .handle((v, ex) -> {
+ TransactionMeta txStateMeta = txManager.stateMeta(txId);
+ CompletableFuture<TransactionMeta> res;
+
+ if (txStateMeta != null) {
+ if (isFinalState(txStateMeta.txState())) {
+ res = completedFuture(txStateMeta);
+ } else if (txStateMeta.txState() == FINISHING) {
+ res = ((TxStateMetaFinishing)
txStateMeta).txFinishFuture();
+ } else {
+ res = failedFuture(new
TransactionInternalException(TX_ROLLBACK_ERR, format("Unexpected transaction "
+ + "state after recovery [txId={},
txMeta={}].", txId, txStateMeta)));
+ }
+ } else {
+ if (ex == null) {
+ res =
completedFuture(TxStateMeta.builder(ABORTED).build());
+ } else {
+ sneakyThrow(ex);
+
+ res = nullCompletedFuture();
+ }
+ }
+
+ return res;
+ })
+ .thenCompose(Function.identity())
+ .whenComplete((v, ex) -> {
+ runCleanupOnNode(commitPartitionId, txId,
commitPartitionNode);
+
+ if (senderGroupId != null && senderId != null) {
+ runCleanupOnNode(senderGroupId, txId, senderId);
+ }
+ });
+ }
+
+ private static Map<ZonePartitionId, PendingTxPartitionEnlistment>
enlistedGroupsMap(
+ ZonePartitionId commitPartitionId,
+ String commitPartitionNode,
+ @Nullable ZonePartitionId senderGroupId,
+ @Nullable InternalClusterNode senderNode
+ ) {
+ if (senderGroupId == null || senderNode == null ||
commitPartitionId.equals(senderGroupId)) {
+ return Map.of(commitPartitionId,
createAbandonedTxRecoveryEnlistment(commitPartitionNode));
+ } else {
+ return Map.of(
+ commitPartitionId,
createAbandonedTxRecoveryEnlistment(commitPartitionNode),
+ senderGroupId,
createAbandonedTxRecoveryEnlistment(senderNode.name())
+ );
+ }
+ }
+
+ private static PendingTxPartitionEnlistment
createAbandonedTxRecoveryEnlistment(String nodeName) {
+ // Enlistment consistency token is not required for the rollback, so
it is 0L.
+ // Passing an empty set of table IDs as we don't know which tables
were enlisted; this is ok as the corresponding write intents
+ // can still be resolved later when reads stumble upon them.
+ return new PendingTxPartitionEnlistment(nodeName, 0L);
+ }
+
+ /**
+ * Run cleanup on a node.
+ *
+ * @param groupId Group id.
+ * @param txId Transaction id.
+ * @param nodeId Node id (inconsistent).
+ */
+ public CompletableFuture<Void> runCleanupOnNode(ZonePartitionId groupId,
UUID txId, UUID nodeId) {
+ // Get node id of the sender to send back cleanup requests.
+ String nodeConsistentId =
clusterNodeResolver.getConsistentIdById(nodeId);
+
+ return nodeConsistentId == null ? nullCompletedFuture() :
runCleanupOnNode(groupId, txId, nodeConsistentId);
+ }
+
+ /**
+ * Run cleanup on a node.
+ *
+ * @param commitPartitionId Commit partition id.
+ * @param txId Transaction id.
+ * @param nodeName Node consistent id.
+ */
+ private CompletableFuture<Void> runCleanupOnNode(ZonePartitionId
commitPartitionId, UUID txId, String nodeName) {
+ return txManager.cleanup(commitPartitionId, nodeName, txId);
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
index b13589a4c29..bba902bed2d 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java
@@ -48,4 +48,11 @@ public interface TxStateCoordinatorRequest extends
NetworkMessage {
* @return Group id of the sender partition, or {@code null} if the
request is not caused by write intent resolution.
*/
@Nullable ZonePartitionIdMessage senderGroupId();
+
+ /**
+ * It's needed to be able to abort transaction in case when commit
partition id is lost on coordinator.
+ *
+ * @return Commit group id.
+ */
+ @Nullable ZonePartitionIdMessage commitPartitionId();
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionStateResolverTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionStateResolverTest.java
index 6cdbd9324d3..f1632b1241a 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionStateResolverTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/TransactionStateResolverTest.java
@@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -56,6 +57,7 @@ import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateResponse;
+import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -103,7 +105,10 @@ public class TransactionStateResolverTest extends
BaseIgniteAbstractTest {
clusterNodeResolver,
messagingService,
placementDriver,
- txMessageSender
+ txMessageSender,
+ new TxRecoveryEngine(txManager,
mock(ClusterNodeResolver.class)),
+ new Lazy<>(() -> mock(InternalClusterNode.class)),
+ Runnable::run
);
// Setup default mock for PlacementDriver to avoid timeouts.
@@ -135,6 +140,7 @@ public class TransactionStateResolverTest extends
BaseIgniteAbstractTest {
when(txMessageSender.resolveTxStateFromCoordinator(
any(InternalClusterNode.class),
+ eq(commitPartitionId),
eq(txId),
any(HybridTimestamp.class),
any(Long.class),
@@ -167,6 +173,7 @@ public class TransactionStateResolverTest extends
BaseIgniteAbstractTest {
ArgumentCaptor<InternalClusterNode> nodeCaptor =
ArgumentCaptor.forClass(InternalClusterNode.class);
verify(txMessageSender).resolveTxStateFromCoordinator(
nodeCaptor.capture(),
+ eq(commitPartitionId),
eq(txId),
any(HybridTimestamp.class),
any(Long.class),
@@ -201,6 +208,7 @@ public class TransactionStateResolverTest extends
BaseIgniteAbstractTest {
when(txMessageSender.resolveTxStateFromCoordinator(
any(InternalClusterNode.class),
+ eq(commitPartitionId),
eq(txId),
any(HybridTimestamp.class),
any(Long.class),
@@ -271,6 +279,7 @@ public class TransactionStateResolverTest extends
BaseIgniteAbstractTest {
when(txMessageSender.resolveTxStateFromCoordinator(
any(InternalClusterNode.class),
+ eq(commitPartitionId),
eq(txId),
any(HybridTimestamp.class),
any(Long.class),
@@ -292,6 +301,7 @@ public class TransactionStateResolverTest extends
BaseIgniteAbstractTest {
ArgumentCaptor<InternalClusterNode> nodeCaptor =
ArgumentCaptor.forClass(InternalClusterNode.class);
verify(txMessageSender).resolveTxStateFromCoordinator(
nodeCaptor.capture(),
+ eq(commitPartitionId),
eq(txId),
any(HybridTimestamp.class),
any(Long.class),