This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e8cad9947 IGNITE-20002 Implement durable unlock on primary partition 
re-election (#2697)
3e8cad9947 is described below

commit 3e8cad9947c38ea05429599ad99d18c7bb59ac1f
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Oct 19 14:03:24 2023 +0300

    IGNITE-20002 Implement durable unlock on primary partition re-election 
(#2697)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |   6 +
 .../placementdriver/TestPlacementDriver.java       |  31 ++-
 .../replicator/PartitionReplicaListener.java       | 148 +++++++++++--
 .../OutgoingSnapshotTxDataStreamingTest.java       |   5 +-
 .../PartitionReplicaListenerDurableUnlockTest.java | 232 +++++++++++++++++++++
 .../replication/PartitionReplicaListenerTest.java  |   3 +-
 .../org/apache/ignite/internal/tx/TxManager.java   |   9 +
 .../java/org/apache/ignite/internal/tx/TxMeta.java |  45 +++-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |   6 +
 .../storage/state/AbstractTxStateStorageTest.java  |  16 +-
 10 files changed, 447 insertions(+), 54 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index fcfc8b2848..6073d2d00f 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -158,6 +159,11 @@ public class FakeTxManager implements TxManager {
         return CompletableFuture.runAsync(runnable);
     }
 
+    @Override
+    public CompletableFuture<?> 
executeCleanupAsync(Supplier<CompletableFuture<?>> action) {
+        return action.get();
+    }
+
     @Override
     public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, 
boolean commit) {
     }
diff --git 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index c5131fd3cb..56e0f4d35f 100644
--- 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -21,11 +21,13 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.event.EventListener;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -33,9 +35,13 @@ import org.jetbrains.annotations.TestOnly;
  * leaseholder.
  */
 @TestOnly
-public class TestPlacementDriver implements PlacementDriver {
+public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters>
+        implements PlacementDriver {
     private final TestReplicaMetaImpl primaryReplica;
 
+    @Nullable
+    private BiFunction<ReplicationGroupId, HybridTimestamp, 
CompletableFuture<ReplicaMeta>> awaitPrimaryReplicaFunction = null;
+
     public TestPlacementDriver(String leaseholder) {
         this.primaryReplica = new TestReplicaMetaImpl(leaseholder);
     }
@@ -47,6 +53,10 @@ public class TestPlacementDriver implements PlacementDriver {
             long timeout,
             TimeUnit unit
     ) {
+        if (awaitPrimaryReplicaFunction != null) {
+            return awaitPrimaryReplicaFunction.apply(groupId, timestamp);
+        }
+
         return completedFuture(primaryReplica);
     }
 
@@ -56,19 +66,18 @@ public class TestPlacementDriver implements PlacementDriver 
{
     }
 
     @Override
-    public void listen(PrimaryReplicaEvent evt, EventListener<? extends 
PrimaryReplicaEventParameters> listener) {
-        if (evt != PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED) {
-            throw new UnsupportedOperationException();
-        }
+    public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId 
grpId) {
+        return completedFuture(null);
     }
 
     @Override
-    public void removeListener(PrimaryReplicaEvent evt, EventListener<? 
extends PrimaryReplicaEventParameters> listener) {
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> fireEvent(PrimaryReplicaEvent event, 
PrimaryReplicaEventParameters parameters) {
+        return super.fireEvent(event, parameters);
     }
 
-    @Override
-    public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId 
grpId) {
-        return completedFuture(null);
+    public void setAwaitPrimaryReplicaFunction(
+            @Nullable BiFunction<ReplicationGroupId, HybridTimestamp, 
CompletableFuture<ReplicaMeta>> awaitPrimaryReplicaFunction
+    ) {
+        this.awaitPrimaryReplicaFunction = awaitPrimaryReplicaFunction;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 80745d7222..745f83a2b9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -41,6 +41,7 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
+import static org.apache.ignite.raft.jraft.util.internal.ThrowUtil.hasCause;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -74,10 +75,12 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteTriFunction;
 import org.apache.ignite.internal.lang.IgniteUuid;
+import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -145,6 +148,7 @@ import 
org.apache.ignite.internal.tx.TransactionAbandonedException;
 import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
@@ -320,35 +324,115 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         schemaCompatValidator = new SchemaCompatValidator(schemas, 
catalogService);
 
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
(evt, e) -> {
-            if (!localNode.name().equals(evt.leaseholder())) {
-                return completedFuture(false);
-            }
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
this::onPrimaryElected);
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
this::onPrimaryExpired);
+    }
+
+    private CompletableFuture<Boolean> 
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
+        if (!localNode.name().equals(evt.leaseholder())) {
+            return completedFuture(false);
+        }
+
+        List<CompletableFuture<?>> cleanupFutures = new ArrayList<>();
+
+        Cursor<IgniteBiTuple<UUID, TxMeta>> txs;
+
+        try {
+            txs = txStateStorage.scan();
+        } catch (IgniteInternalException e) {
+            return completedFuture(false);
+        }
 
-            LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+        for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
+            UUID txId = tx.getKey();
+            TxMeta txMeta = tx.getValue();
 
-            ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+            assert !txMeta.enlistedPartitions().isEmpty();
 
-            for (UUID txId : txCleanupReadyFutures.keySet()) {
-                txCleanupReadyFutures.compute(txId, (id, txOps) -> {
-                    if (txOps == null || isFinalState(txOps.state)) {
-                        return null;
+            if (isFinalState(txMeta.txState()) && !txMeta.locksReleased()) {
+                CompletableFuture<?> cleanupFuture = 
txManager.executeCleanupAsync(() -> durableCleanup(txId, txMeta));
+
+                cleanupFutures.add(cleanupFuture);
+            }
+        }
+
+        allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]))
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failure occurred while triggering cleanup 
on commit partition primary replica election "
+                                + "[commitPartition={}]", e, 
replicationGroupId);
                     }
+                });
 
-                    if (!txOps.futures.isEmpty()) {
-                        CompletableFuture<?>[] txFuts = 
txOps.futures.values().stream()
-                                .flatMap(Collection::stream)
-                                .toArray(CompletableFuture[]::new);
+        // The future returned by this event handler can't wait for all 
cleanups because it's not necessary and it can block
+        // meta storage notification thread for a while, preventing it from 
delivering further updates (including leases) and therefore
+        // causing deadlock on primary replica waiting.
+        return completedFuture(false);
+    }
+
+    private CompletableFuture<?> durableCleanup(UUID txId, TxMeta txMeta) {
+        return cleanup(txId, txMeta)
+                .handle((v, e) -> {
+                    if (e == null) {
+                        return txManager.executeCleanupAsync(() -> 
markLocksReleased(
+                                txId,
+                                txMeta.enlistedPartitions(),
+                                txMeta.txState(),
+                                txMeta.commitTimestamp())
+                        );
+                    } else {
+                        LOG.warn("Failed to execute cleanup on commit 
partition primary replica switch [txId={}, commitPartition={}]",
+                                e, txId, replicationGroupId);
 
-                        futs.add(allOf(txFuts).whenComplete((unused, 
throwable) -> releaseTxLocks(txId)));
+                        if (hasCause(e, null, NodeStoppingException.class)) {
+                            return completedFuture(null);
+                        } else {
+                            return txManager.executeCleanupAsync(() -> 
durableCleanup(txId, txMeta));
+                        }
                     }
+                })
+                .thenCompose(f -> f);
+    }
 
-                    return txOps;
-                });
-            }
+    private void markLocksReleased(
+            UUID txId,
+            Collection<TablePartitionId> enlistedPartitions,
+            TxState txState,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        TxMeta newTxMeta = new TxMeta(txState, enlistedPartitions, 
commitTimestamp, true);
 
-            return 
allOf(futs.toArray(CompletableFuture[]::new)).thenApply(unused -> false);
-        });
+        txStateStorage.put(txId, newTxMeta);
+    }
+
+    private CompletableFuture<Boolean> 
onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
+        if (!localNode.name().equals(evt.leaseholder())) {
+            return completedFuture(false);
+        }
+
+        LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+
+        ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+
+        for (UUID txId : txCleanupReadyFutures.keySet()) {
+            txCleanupReadyFutures.compute(txId, (id, txOps) -> {
+                if (txOps == null || isFinalState(txOps.state)) {
+                    return null;
+                }
+
+                if (!txOps.futures.isEmpty()) {
+                    CompletableFuture<?>[] txFuts = 
txOps.futures.values().stream()
+                            .flatMap(Collection::stream)
+                            .toArray(CompletableFuture[]::new);
+
+                    futs.add(allOf(txFuts).whenComplete((unused, throwable) -> 
releaseTxLocks(txId)));
+                }
+
+                return txOps;
+            });
+        }
+
+        return allOf(futs.toArray(CompletableFuture[]::new)).thenApply(unused 
-> false);
     }
 
     @Override
@@ -1227,9 +1311,31 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     ) {
         CompletableFuture<?> changeStateFuture = 
finishTransaction(enlistedPartitions, txId, commit, commitTimestamp, 
txCoordinatorId);
 
+        return cleanup(changeStateFuture, enlistedPartitions, commit, 
commitTimestamp, txId, ATTEMPTS_TO_CLEANUP_REPLICA)
+                .thenRun(() -> markLocksReleased(
+                        txId,
+                        enlistedPartitions,
+                        commit ? COMMITED : ABORTED,
+                        commitTimestamp)
+                );
+    }
+
+    private CompletableFuture<Void> cleanup(UUID txId, TxMeta txMeta) {
+        return cleanup(completedFuture(null), txMeta.enlistedPartitions(), 
txMeta.txState() == COMMITED, txMeta.commitTimestamp(), txId, 1);
+    }
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-20681 remove attempts 
count.
+    private CompletableFuture<Void> cleanup(
+            CompletableFuture<?> changeStateFuture,
+            Collection<TablePartitionId> enlistedPartitions,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId,
+            int attemptsToCleanupReplica
+    ) {
         CompletableFuture<?>[] futures = enlistedPartitions.stream()
                 .map(partitionId -> changeStateFuture.thenCompose(ignored ->
-                        cleanupWithRetry(commit, commitTimestamp, txId, 
partitionId, ATTEMPTS_TO_CLEANUP_REPLICA)))
+                        cleanupWithRetry(commit, commitTimestamp, txId, 
partitionId, attemptsToCleanupReplica)))
                 .toArray(size -> new CompletableFuture<?>[size]);
 
         return allOf(futures);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
index 5622310e15..0ec515f1f7 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.internal.catalog.CatalogService;
@@ -100,11 +101,11 @@ class OutgoingSnapshotTxDataStreamingTest extends 
BaseIgniteAbstractTest {
         assertThat(response.txMeta(), hasSize(2));
 
         assertThat(response.txMeta().get(0).txState(), is(TxState.ABORTED));
-        assertThat(response.txMeta().get(0).enlistedPartitions(), 
is(List.of(partition1Id)));
+        assertThat(new 
ArrayList<>(response.txMeta().get(0).enlistedPartitions()), 
is(List.of(partition1Id)));
         assertThat(response.txMeta().get(0).commitTimestamp(), 
is(meta1.commitTimestamp()));
 
         assertThat(response.txMeta().get(1).txState(), is(TxState.COMMITED));
-        assertThat(response.txMeta().get(1).enlistedPartitions(), 
is(List.of(partition1Id, partition2Id)));
+        assertThat(new 
ArrayList<>(response.txMeta().get(1).enlistedPartitions()), 
is(List.of(partition1Id, partition2Id)));
         assertThat(response.txMeta().get(1).commitTimestamp(), 
is(meta2.commitTimestamp()));
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
new file mode 100644
index 0000000000..3341a10aea
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replication;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.tx.test.TestTransactionIds;
+import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Class for testing durable unlock on commit partition recovery.
+ */
+@ExtendWith(MockitoExtension.class)
+public class PartitionReplicaListenerDurableUnlockTest extends 
IgniteAbstractTest {
+    private static final String LOCAL_NODE_NAME = "node1";
+
+    private static final int PART_ID = 0;
+
+    private static final int TABLE_ID = 1;
+
+    /** Hybrid clock. */
+    private final HybridClock clock = new HybridClockImpl();
+
+    private final TestPlacementDriver placementDriver = new 
TestPlacementDriver(LOCAL_NODE_NAME);
+
+    /** The storage stores transaction states. */
+    private final TestTxStateStorage txStateStorage = new TestTxStateStorage();
+
+    @Mock
+    private TxManager txManager;
+
+    /** Partition replication listener to test. */
+    private PartitionReplicaListener partitionReplicaListener;
+
+    private BiFunction<UUID, TablePartitionId, CompletableFuture<Void>> 
cleanupCallback = (a, b) -> completedFuture(null);
+
+    @BeforeEach
+    public void beforeTest() {
+        doAnswer(invocation -> {
+            Runnable r = invocation.getArgument(0);
+            r.run();
+            return completedFuture(null);
+        }).when(txManager).executeCleanupAsync(any(Runnable.class));
+
+        doAnswer(invocation -> {
+            Supplier<CompletableFuture<?>> s = invocation.getArgument(0);
+            return s.get();
+        }).when(txManager).executeCleanupAsync(any(Supplier.class));
+
+        doAnswer(invocation -> {
+            UUID txId = invocation.getArgument(2);
+            TablePartitionId partitionId = invocation.getArgument(1);
+            return cleanupCallback.apply(txId, partitionId);
+        }).when(txManager).cleanup(anyString(), any(), any(), anyBoolean(), 
any());
+
+        partitionReplicaListener = new PartitionReplicaListener(
+                new TestMvPartitionStorage(PART_ID),
+                mock(RaftGroupService.class),
+                txManager,
+                new HeapLockManager(),
+                Runnable::run,
+                PART_ID,
+                TABLE_ID,
+                Map::of,
+                new Lazy<>(null),
+                Map::of,
+                clock,
+                new PendingComparableValuesTracker<>(new HybridTimestamp(1, 
1)),
+                txStateStorage,
+                mock(TransactionStateResolver.class),
+                mock(StorageUpdateHandler.class),
+                mock(Schemas.class),
+                new ClusterNodeImpl("node1", LOCAL_NODE_NAME, 
NetworkAddress.from("127.0.0.1:127")),
+                mock(SchemaSyncService.class),
+                mock(CatalogService.class),
+                placementDriver
+        );
+    }
+
+    @Test
+    public void testOnlyFinishedAreCleanedUp() {
+        UUID tx0 = TestTransactionIds.newTransactionId();
+        UUID tx1 = TestTransactionIds.newTransactionId();
+        UUID tx2 = TestTransactionIds.newTransactionId();
+
+        TablePartitionId part0 = new TablePartitionId(TABLE_ID, PART_ID);
+        TablePartitionId part1 = new TablePartitionId(TABLE_ID, 1);
+
+        txStateStorage.put(tx0, new TxMeta(TxState.PENDING, List.of(part0, 
part1), null));
+        txStateStorage.put(tx1, new TxMeta(TxState.COMMITED, List.of(part0), 
clock.now()));
+        txStateStorage.put(tx2, new TxMeta(TxState.ABORTED, List.of(part0), 
null));
+
+        cleanupCallback = (tx, partId) -> {
+            assertTrue(isFinalState(txStateStorage.get(tx).txState()));
+
+            return completedFuture(null);
+        };
+
+        PrimaryReplicaEventParameters parameters = new 
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE_NAME);
+
+        
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
 parameters), willSucceedIn(1, SECONDS));
+
+        for (IgniteBiTuple<UUID, TxMeta> tx : txStateStorage.scan()) {
+            if (isFinalState(tx.getValue().txState())) {
+                assertTrue(tx.getValue().locksReleased());
+            }
+        }
+    }
+
+    @Test
+    public void testRepeatedUntilSuccess() {
+        UUID tx0 = TestTransactionIds.newTransactionId();
+        TablePartitionId part0 = new TablePartitionId(TABLE_ID, PART_ID);
+        txStateStorage.put(tx0, new TxMeta(TxState.COMMITED, List.of(part0), 
null));
+
+        int exCnt = 3;
+        AtomicInteger exceptionCounter = new AtomicInteger(exCnt);
+
+        cleanupCallback = (tx, partId) -> {
+            assertThat(exceptionCounter.get(), greaterThan(0));
+
+            if (exceptionCounter.decrementAndGet() > 0) {
+                throw new RuntimeException("test exception");
+            }
+
+            return completedFuture(null);
+        };
+
+        PrimaryReplicaEventParameters parameters = new 
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE_NAME);
+
+        
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
 parameters), willSucceedIn(1, SECONDS));
+
+        assertTrue(txStateStorage.get(tx0).locksReleased());
+
+        assertEquals(0, exceptionCounter.get());
+    }
+
+    @Test
+    public void testCantGetPrimaryReplicaForEnlistedPartition() throws 
InterruptedException {
+        UUID tx0 = TestTransactionIds.newTransactionId();
+        TablePartitionId part0 = new TablePartitionId(TABLE_ID, PART_ID);
+        txStateStorage.put(tx0, new TxMeta(TxState.COMMITED, List.of(part0), 
null));
+
+        cleanupCallback = (tx, partId) -> completedFuture(null);
+
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = new 
CompletableFuture<>();
+        placementDriver.setAwaitPrimaryReplicaFunction((groupId, timestamp) -> 
primaryReplicaFuture);
+
+        PrimaryReplicaEventParameters parameters = new 
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE_NAME);
+        
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
 parameters), willSucceedIn(1, SECONDS));
+
+        assertFalse(txStateStorage.get(tx0).locksReleased());
+
+        Thread primaryReplicaFutureCompleteThread =
+                new Thread(() -> 
primaryReplicaFuture.completeExceptionally(new RuntimeException("test 
exception")));
+        primaryReplicaFutureCompleteThread.start();
+
+        assertFalse(txStateStorage.get(tx0).locksReleased());
+
+        placementDriver.setAwaitPrimaryReplicaFunction(null);
+
+        primaryReplicaFutureCompleteThread.join();
+
+        assertTrue(txStateStorage.get(tx0).locksReleased());
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 6bc1de3b8e..68b824d48a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -436,7 +436,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         doAnswer(invocation -> txStateMeta).when(txManager).stateMeta(any());
 
-        doAnswer(invocation -> 
completedFuture(null)).when(txManager).executeCleanupAsync(any());
+        doAnswer(invocation -> 
completedFuture(null)).when(txManager).executeCleanupAsync(any(Runnable.class));
 
         doAnswer(invocation -> {
             var resp = new 
TxMessagesFactory().txStateResponse().txStateMeta(txStateMeta).build();
@@ -518,7 +518,6 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         ((TestSortedIndexStorage) sortedIndexStorage.storage()).clear();
         testMvPartitionStorage.clear();
         pendingRows.clear();
-        //lockManager.locks(txId).forEachRemaining(lock -> 
lockManager.release(lock));
     }
 
     @Test
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 44a3af2ae1..72c5320612 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -90,6 +91,14 @@ public interface TxManager extends IgniteComponent {
      */
     CompletableFuture<Void> executeCleanupAsync(Runnable runnable);
 
+    /**
+     * Execute transaction cleanup asynchronously.
+     *
+     * @param action Cleanup action.
+     * @return Future that completes once the cleanup action finishes.
+     */
+    CompletableFuture<?> executeCleanupAsync(Supplier<CompletableFuture<?>> 
action);
+
     /**
      * Finishes a one-phase committed transaction. This method doesn't contain 
any distributed communication.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
index 8eab25e38a..82df9c3379 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.tx;
 
-import static java.util.Collections.unmodifiableList;
+import static java.util.Collections.unmodifiableCollection;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.Objects;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -35,23 +35,48 @@ public class TxMeta implements TransactionMeta {
     private final TxState txState;
 
     /** The list of enlisted partitions. */
-    private final List<TablePartitionId> enlistedPartitions;
+    private final Collection<TablePartitionId> enlistedPartitions;
 
     /** Commit timestamp. */
     @Nullable
     private final HybridTimestamp commitTimestamp;
 
+    /**
+     * Whether the locks are released. It is needed for tx recovery 
operations, to guarantee locks release, meanwhile the cleanup
+     * of write intents is not so important for recovery, because write 
intents can be resolved at any time while the tx state is known.
+     */
+    private final boolean locksReleased;
+
+    /**
+     * The constructor.
+     *
+     * @param txState Tx state.
+     * @param enlistedPartitions The list of enlisted partitions.
+     * @param commitTimestamp Commit timestamp.
+     */
+    public TxMeta(TxState txState, Collection<TablePartitionId> 
enlistedPartitions, @Nullable HybridTimestamp commitTimestamp
+    ) {
+        this(txState, enlistedPartitions, commitTimestamp, false);
+    }
+
     /**
      * The constructor.
      *
      * @param txState Tx state.
      * @param enlistedPartitions The list of enlisted partitions.
      * @param commitTimestamp Commit timestamp.
+     * @param locksReleased Whether the locks are released.
      */
-    public TxMeta(TxState txState, List<TablePartitionId> enlistedPartitions, 
@Nullable HybridTimestamp commitTimestamp) {
+    public TxMeta(
+            TxState txState,
+            Collection<TablePartitionId> enlistedPartitions,
+            @Nullable HybridTimestamp commitTimestamp,
+            boolean locksReleased
+    ) {
         this.txState = txState;
         this.enlistedPartitions = enlistedPartitions;
         this.commitTimestamp = commitTimestamp;
+        this.locksReleased = locksReleased;
     }
 
     @Override
@@ -59,8 +84,8 @@ public class TxMeta implements TransactionMeta {
         return txState;
     }
 
-    public List<TablePartitionId> enlistedPartitions() {
-        return unmodifiableList(enlistedPartitions);
+    public Collection<TablePartitionId> enlistedPartitions() {
+        return unmodifiableCollection(enlistedPartitions);
     }
 
     @Override
@@ -68,6 +93,10 @@ public class TxMeta implements TransactionMeta {
         return commitTimestamp;
     }
 
+    public boolean locksReleased() {
+        return locksReleased;
+    }
+
     @Override
     public String toString() {
         return S.toString(TxMeta.class, this);
@@ -87,7 +116,8 @@ public class TxMeta implements TransactionMeta {
 
         return txState == other.txState
                 && enlistedPartitions.equals(other.enlistedPartitions)
-                && Objects.equals(commitTimestamp, other.commitTimestamp);
+                && Objects.equals(commitTimestamp, other.commitTimestamp)
+                && locksReleased == other.locksReleased;
     }
 
     @Override
@@ -95,6 +125,7 @@ public class TxMeta implements TransactionMeta {
         int result = txState.hashCode();
         result = 31 * result + enlistedPartitions.hashCode();
         result = 31 * result + (commitTimestamp != null ? 
commitTimestamp.hashCode() : 0);
+        result = 31 * result + Boolean.hashCode(locksReleased);
         return result;
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index cf5c315a35..422f5da91f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.tx.impl;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -483,6 +484,11 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         return runAsync(runnable, cleanupExecutor);
     }
 
+    @Override
+    public CompletableFuture<?> 
executeCleanupAsync(Supplier<CompletableFuture<?>> action) {
+        return supplyAsync(action, cleanupExecutor).thenCompose(f -> f);
+    }
+
     CompletableFuture<Void> completeReadOnlyTransactionFuture(TxIdAndTimestamp 
txIdAndTimestamp) {
         CompletableFuture<Void> readOnlyTxFuture = 
readOnlyTxFutureById.remove(txIdAndTimestamp);
 
diff --git 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
index 358c59aa46..63c1b68769 100644
--- 
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
+++ 
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -97,7 +97,7 @@ public abstract class AbstractTxStateStorageTest {
         for (int i = 0; i < 100; i++) {
             TxMeta txMeta = storage.get(txIds.get(i));
             TxMeta txMetaExpected = new TxMeta(TxState.COMMITED, 
generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
-            assertTxMetaEquals(txMetaExpected, txMeta);
+            assertEquals(txMetaExpected, txMeta);
         }
 
         for (int i = 0; i < 100; i++) {
@@ -113,7 +113,7 @@ public abstract class AbstractTxStateStorageTest {
             } else {
                 TxMeta txMeta = storage.get(txIds.get(i));
                 TxMeta txMetaExpected = new TxMeta(TxState.COMMITED, 
generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
-                assertTxMetaEquals(txMetaExpected, txMeta);
+                assertEquals(txMetaExpected, txMeta);
             }
         }
     }
@@ -155,7 +155,7 @@ public abstract class AbstractTxStateStorageTest {
         TxMeta txMetaNullTimestamp0 = new TxMeta(txMeta1.txState(), 
txMeta1.enlistedPartitions(), null);
         assertFalse(storage.compareAndSet(txId, TxState.ABORTED, 
txMetaNullTimestamp0, 3, 2));
 
-        assertTxMetaEquals(storage.get(txId), txMeta1);
+        assertEquals(storage.get(txId), txMeta1);
 
         assertTrue(storage.compareAndSet(txId, txMeta1.txState(), txMeta2, 3, 
2));
         // Checking idempotency.
@@ -165,7 +165,7 @@ public abstract class AbstractTxStateStorageTest {
         TxMeta txMetaNullTimestamp2 = new TxMeta(txMeta2.txState(), 
txMeta2.enlistedPartitions(), null);
         assertFalse(storage.compareAndSet(txId, TxState.ABORTED, 
txMetaNullTimestamp2, 3, 2));
 
-        assertTxMetaEquals(storage.get(txId), txMeta2);
+        assertEquals(storage.get(txId), txMeta2);
     }
 
     @Test
@@ -194,7 +194,7 @@ public abstract class AbstractTxStateStorageTest {
                 assertNotNull(txMeta);
                 assertNotNull(txData);
                 assertNotNull(txData.getValue());
-                assertTxMetaEquals(txMeta, txData.getValue());
+                assertEquals(txMeta, txData.getValue());
             }
 
             assertTrue(txs.isEmpty());
@@ -474,12 +474,6 @@ public abstract class AbstractTxStateStorageTest {
         );
     }
 
-    private static void assertTxMetaEquals(TxMeta txMeta0, TxMeta txMeta1) {
-        assertEquals(txMeta0.txState(), txMeta1.txState());
-        assertEquals(txMeta0.commitTimestamp(), txMeta1.commitTimestamp());
-        assertEquals(txMeta0.enlistedPartitions(), 
txMeta1.enlistedPartitions());
-    }
-
     private static void assertThrowsIgniteInternalException(int 
expFullErrorCode, Executable executable) {
         IgniteInternalException exception = 
assertThrows(IgniteInternalException.class, executable);
 


Reply via email to