This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3e8cad9947 IGNITE-20002 Implement durable unlock on primary partition
re-election (#2697)
3e8cad9947 is described below
commit 3e8cad9947c38ea05429599ad99d18c7bb59ac1f
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Oct 19 14:03:24 2023 +0300
IGNITE-20002 Implement durable unlock on primary partition re-election
(#2697)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 6 +
.../placementdriver/TestPlacementDriver.java | 31 ++-
.../replicator/PartitionReplicaListener.java | 148 +++++++++++--
.../OutgoingSnapshotTxDataStreamingTest.java | 5 +-
.../PartitionReplicaListenerDurableUnlockTest.java | 232 +++++++++++++++++++++
.../replication/PartitionReplicaListenerTest.java | 3 +-
.../org/apache/ignite/internal/tx/TxManager.java | 9 +
.../java/org/apache/ignite/internal/tx/TxMeta.java | 45 +++-
.../ignite/internal/tx/impl/TxManagerImpl.java | 6 +
.../storage/state/AbstractTxStateStorageTest.java | 16 +-
10 files changed, 447 insertions(+), 54 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index fcfc8b2848..6073d2d00f 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -158,6 +159,11 @@ public class FakeTxManager implements TxManager {
return CompletableFuture.runAsync(runnable);
}
+ @Override
+ public CompletableFuture<?>
executeCleanupAsync(Supplier<CompletableFuture<?>> action) {
+ return action.get();
+ }
+
@Override
public void finishFull(HybridTimestampTracker timestampTracker, UUID txId,
boolean commit) {
}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index c5131fd3cb..56e0f4d35f 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -21,11 +21,13 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.event.EventListener;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -33,9 +35,13 @@ import org.jetbrains.annotations.TestOnly;
* leaseholder.
*/
@TestOnly
-public class TestPlacementDriver implements PlacementDriver {
+public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters>
+ implements PlacementDriver {
private final TestReplicaMetaImpl primaryReplica;
+ @Nullable
+ private BiFunction<ReplicationGroupId, HybridTimestamp,
CompletableFuture<ReplicaMeta>> awaitPrimaryReplicaFunction = null;
+
public TestPlacementDriver(String leaseholder) {
this.primaryReplica = new TestReplicaMetaImpl(leaseholder);
}
@@ -47,6 +53,10 @@ public class TestPlacementDriver implements PlacementDriver {
long timeout,
TimeUnit unit
) {
+ if (awaitPrimaryReplicaFunction != null) {
+ return awaitPrimaryReplicaFunction.apply(groupId, timestamp);
+ }
+
return completedFuture(primaryReplica);
}
@@ -56,19 +66,18 @@ public class TestPlacementDriver implements PlacementDriver
{
}
@Override
- public void listen(PrimaryReplicaEvent evt, EventListener<? extends
PrimaryReplicaEventParameters> listener) {
- if (evt != PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED) {
- throw new UnsupportedOperationException();
- }
+ public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
+ return completedFuture(null);
}
@Override
- public void removeListener(PrimaryReplicaEvent evt, EventListener<?
extends PrimaryReplicaEventParameters> listener) {
- throw new UnsupportedOperationException();
+ public CompletableFuture<Void> fireEvent(PrimaryReplicaEvent event,
PrimaryReplicaEventParameters parameters) {
+ return super.fireEvent(event, parameters);
}
- @Override
- public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId
grpId) {
- return completedFuture(null);
+ public void setAwaitPrimaryReplicaFunction(
+ @Nullable BiFunction<ReplicationGroupId, HybridTimestamp,
CompletableFuture<ReplicaMeta>> awaitPrimaryReplicaFunction
+ ) {
+ this.awaitPrimaryReplicaFunction = awaitPrimaryReplicaFunction;
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 80745d7222..745f83a2b9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -41,6 +41,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
+import static org.apache.ignite.raft.jraft.util.internal.ThrowUtil.hasCause;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -74,10 +75,12 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.lang.IgniteUuid;
+import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -145,6 +148,7 @@ import
org.apache.ignite.internal.tx.TransactionAbandonedException;
import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
@@ -320,35 +324,115 @@ public class PartitionReplicaListener implements
ReplicaListener {
schemaCompatValidator = new SchemaCompatValidator(schemas,
catalogService);
- placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
(evt, e) -> {
- if (!localNode.name().equals(evt.leaseholder())) {
- return completedFuture(false);
- }
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
this::onPrimaryElected);
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
this::onPrimaryExpired);
+ }
+
+ private CompletableFuture<Boolean>
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable
exception) {
+ if (!localNode.name().equals(evt.leaseholder())) {
+ return completedFuture(false);
+ }
+
+ List<CompletableFuture<?>> cleanupFutures = new ArrayList<>();
+
+ Cursor<IgniteBiTuple<UUID, TxMeta>> txs;
+
+ try {
+ txs = txStateStorage.scan();
+ } catch (IgniteInternalException e) {
+ return completedFuture(false);
+ }
- LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+ for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
+ UUID txId = tx.getKey();
+ TxMeta txMeta = tx.getValue();
- ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+ assert !txMeta.enlistedPartitions().isEmpty();
- for (UUID txId : txCleanupReadyFutures.keySet()) {
- txCleanupReadyFutures.compute(txId, (id, txOps) -> {
- if (txOps == null || isFinalState(txOps.state)) {
- return null;
+ if (isFinalState(txMeta.txState()) && !txMeta.locksReleased()) {
+ CompletableFuture<?> cleanupFuture =
txManager.executeCleanupAsync(() -> durableCleanup(txId, txMeta));
+
+ cleanupFutures.add(cleanupFuture);
+ }
+ }
+
+ allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Failure occurred while triggering cleanup
on commit partition primary replica election "
+ + "[commitPartition={}]", e,
replicationGroupId);
}
+ });
- if (!txOps.futures.isEmpty()) {
- CompletableFuture<?>[] txFuts =
txOps.futures.values().stream()
- .flatMap(Collection::stream)
- .toArray(CompletableFuture[]::new);
+ // The future returned by this event handler can't wait for all
cleanups because it's not necessary and it can block
+ // meta storage notification thread for a while, preventing it from
delivering further updates (including leases) and therefore
+ // causing deadlock on primary replica waiting.
+ return completedFuture(false);
+ }
+
+ private CompletableFuture<?> durableCleanup(UUID txId, TxMeta txMeta) {
+ return cleanup(txId, txMeta)
+ .handle((v, e) -> {
+ if (e == null) {
+ return txManager.executeCleanupAsync(() ->
markLocksReleased(
+ txId,
+ txMeta.enlistedPartitions(),
+ txMeta.txState(),
+ txMeta.commitTimestamp())
+ );
+ } else {
+ LOG.warn("Failed to execute cleanup on commit
partition primary replica switch [txId={}, commitPartition={}]",
+ e, txId, replicationGroupId);
- futs.add(allOf(txFuts).whenComplete((unused,
throwable) -> releaseTxLocks(txId)));
+ if (hasCause(e, null, NodeStoppingException.class)) {
+ return completedFuture(null);
+ } else {
+ return txManager.executeCleanupAsync(() ->
durableCleanup(txId, txMeta));
+ }
}
+ })
+ .thenCompose(f -> f);
+ }
- return txOps;
- });
- }
+ private void markLocksReleased(
+ UUID txId,
+ Collection<TablePartitionId> enlistedPartitions,
+ TxState txState,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
+ TxMeta newTxMeta = new TxMeta(txState, enlistedPartitions,
commitTimestamp, true);
- return
allOf(futs.toArray(CompletableFuture[]::new)).thenApply(unused -> false);
- });
+ txStateStorage.put(txId, newTxMeta);
+ }
+
+ private CompletableFuture<Boolean>
onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable
exception) {
+ if (!localNode.name().equals(evt.leaseholder())) {
+ return completedFuture(false);
+ }
+
+ LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+
+ ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+
+ for (UUID txId : txCleanupReadyFutures.keySet()) {
+ txCleanupReadyFutures.compute(txId, (id, txOps) -> {
+ if (txOps == null || isFinalState(txOps.state)) {
+ return null;
+ }
+
+ if (!txOps.futures.isEmpty()) {
+ CompletableFuture<?>[] txFuts =
txOps.futures.values().stream()
+ .flatMap(Collection::stream)
+ .toArray(CompletableFuture[]::new);
+
+ futs.add(allOf(txFuts).whenComplete((unused, throwable) ->
releaseTxLocks(txId)));
+ }
+
+ return txOps;
+ });
+ }
+
+ return allOf(futs.toArray(CompletableFuture[]::new)).thenApply(unused
-> false);
}
@Override
@@ -1227,9 +1311,31 @@ public class PartitionReplicaListener implements
ReplicaListener {
) {
CompletableFuture<?> changeStateFuture =
finishTransaction(enlistedPartitions, txId, commit, commitTimestamp,
txCoordinatorId);
+ return cleanup(changeStateFuture, enlistedPartitions, commit,
commitTimestamp, txId, ATTEMPTS_TO_CLEANUP_REPLICA)
+ .thenRun(() -> markLocksReleased(
+ txId,
+ enlistedPartitions,
+ commit ? COMMITED : ABORTED,
+ commitTimestamp)
+ );
+ }
+
+ private CompletableFuture<Void> cleanup(UUID txId, TxMeta txMeta) {
+ return cleanup(completedFuture(null), txMeta.enlistedPartitions(),
txMeta.txState() == COMMITED, txMeta.commitTimestamp(), txId, 1);
+ }
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20681 remove attempts
count.
+ private CompletableFuture<Void> cleanup(
+ CompletableFuture<?> changeStateFuture,
+ Collection<TablePartitionId> enlistedPartitions,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ UUID txId,
+ int attemptsToCleanupReplica
+ ) {
CompletableFuture<?>[] futures = enlistedPartitions.stream()
.map(partitionId -> changeStateFuture.thenCompose(ignored ->
- cleanupWithRetry(commit, commitTimestamp, txId,
partitionId, ATTEMPTS_TO_CLEANUP_REPLICA)))
+ cleanupWithRetry(commit, commitTimestamp, txId,
partitionId, attemptsToCleanupReplica)))
.toArray(size -> new CompletableFuture<?>[size]);
return allOf(futures);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
index 5622310e15..0ec515f1f7 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.catalog.CatalogService;
@@ -100,11 +101,11 @@ class OutgoingSnapshotTxDataStreamingTest extends
BaseIgniteAbstractTest {
assertThat(response.txMeta(), hasSize(2));
assertThat(response.txMeta().get(0).txState(), is(TxState.ABORTED));
- assertThat(response.txMeta().get(0).enlistedPartitions(),
is(List.of(partition1Id)));
+ assertThat(new
ArrayList<>(response.txMeta().get(0).enlistedPartitions()),
is(List.of(partition1Id)));
assertThat(response.txMeta().get(0).commitTimestamp(),
is(meta1.commitTimestamp()));
assertThat(response.txMeta().get(1).txState(), is(TxState.COMMITED));
- assertThat(response.txMeta().get(1).enlistedPartitions(),
is(List.of(partition1Id, partition2Id)));
+ assertThat(new
ArrayList<>(response.txMeta().get(1).enlistedPartitions()),
is(List.of(partition1Id, partition2Id)));
assertThat(response.txMeta().get(1).commitTimestamp(),
is(meta2.commitTimestamp()));
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
new file mode 100644
index 0000000000..3341a10aea
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replication;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.internal.tx.test.TestTransactionIds;
+import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Class for testing durable unlock on commit partition recovery.
+ */
+@ExtendWith(MockitoExtension.class)
+public class PartitionReplicaListenerDurableUnlockTest extends
IgniteAbstractTest {
+ private static final String LOCAL_NODE_NAME = "node1";
+
+ private static final int PART_ID = 0;
+
+ private static final int TABLE_ID = 1;
+
+ /** Hybrid clock. */
+ private final HybridClock clock = new HybridClockImpl();
+
+ private final TestPlacementDriver placementDriver = new
TestPlacementDriver(LOCAL_NODE_NAME);
+
+ /** The storage stores transaction states. */
+ private final TestTxStateStorage txStateStorage = new TestTxStateStorage();
+
+ @Mock
+ private TxManager txManager;
+
+ /** Partition replication listener to test. */
+ private PartitionReplicaListener partitionReplicaListener;
+
+ private BiFunction<UUID, TablePartitionId, CompletableFuture<Void>>
cleanupCallback = (a, b) -> completedFuture(null);
+
+ @BeforeEach
+ public void beforeTest() {
+ doAnswer(invocation -> {
+ Runnable r = invocation.getArgument(0);
+ r.run();
+ return completedFuture(null);
+ }).when(txManager).executeCleanupAsync(any(Runnable.class));
+
+ doAnswer(invocation -> {
+ Supplier<CompletableFuture<?>> s = invocation.getArgument(0);
+ return s.get();
+ }).when(txManager).executeCleanupAsync(any(Supplier.class));
+
+ doAnswer(invocation -> {
+ UUID txId = invocation.getArgument(2);
+ TablePartitionId partitionId = invocation.getArgument(1);
+ return cleanupCallback.apply(txId, partitionId);
+ }).when(txManager).cleanup(anyString(), any(), any(), anyBoolean(),
any());
+
+ partitionReplicaListener = new PartitionReplicaListener(
+ new TestMvPartitionStorage(PART_ID),
+ mock(RaftGroupService.class),
+ txManager,
+ new HeapLockManager(),
+ Runnable::run,
+ PART_ID,
+ TABLE_ID,
+ Map::of,
+ new Lazy<>(null),
+ Map::of,
+ clock,
+ new PendingComparableValuesTracker<>(new HybridTimestamp(1,
1)),
+ txStateStorage,
+ mock(TransactionStateResolver.class),
+ mock(StorageUpdateHandler.class),
+ mock(Schemas.class),
+ new ClusterNodeImpl("node1", LOCAL_NODE_NAME,
NetworkAddress.from("127.0.0.1:127")),
+ mock(SchemaSyncService.class),
+ mock(CatalogService.class),
+ placementDriver
+ );
+ }
+
+ @Test
+ public void testOnlyFinishedAreCleanedUp() {
+ UUID tx0 = TestTransactionIds.newTransactionId();
+ UUID tx1 = TestTransactionIds.newTransactionId();
+ UUID tx2 = TestTransactionIds.newTransactionId();
+
+ TablePartitionId part0 = new TablePartitionId(TABLE_ID, PART_ID);
+ TablePartitionId part1 = new TablePartitionId(TABLE_ID, 1);
+
+ txStateStorage.put(tx0, new TxMeta(TxState.PENDING, List.of(part0,
part1), null));
+ txStateStorage.put(tx1, new TxMeta(TxState.COMMITED, List.of(part0),
clock.now()));
+ txStateStorage.put(tx2, new TxMeta(TxState.ABORTED, List.of(part0),
null));
+
+ cleanupCallback = (tx, partId) -> {
+ assertTrue(isFinalState(txStateStorage.get(tx).txState()));
+
+ return completedFuture(null);
+ };
+
+ PrimaryReplicaEventParameters parameters = new
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE_NAME);
+
+
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
parameters), willSucceedIn(1, SECONDS));
+
+ for (IgniteBiTuple<UUID, TxMeta> tx : txStateStorage.scan()) {
+ if (isFinalState(tx.getValue().txState())) {
+ assertTrue(tx.getValue().locksReleased());
+ }
+ }
+ }
+
+ @Test
+ public void testRepeatedUntilSuccess() {
+ UUID tx0 = TestTransactionIds.newTransactionId();
+ TablePartitionId part0 = new TablePartitionId(TABLE_ID, PART_ID);
+ txStateStorage.put(tx0, new TxMeta(TxState.COMMITED, List.of(part0),
null));
+
+ int exCnt = 3;
+ AtomicInteger exceptionCounter = new AtomicInteger(exCnt);
+
+ cleanupCallback = (tx, partId) -> {
+ assertThat(exceptionCounter.get(), greaterThan(0));
+
+ if (exceptionCounter.decrementAndGet() > 0) {
+ throw new RuntimeException("test exception");
+ }
+
+ return completedFuture(null);
+ };
+
+ PrimaryReplicaEventParameters parameters = new
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE_NAME);
+
+
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
parameters), willSucceedIn(1, SECONDS));
+
+ assertTrue(txStateStorage.get(tx0).locksReleased());
+
+ assertEquals(0, exceptionCounter.get());
+ }
+
+ @Test
+ public void testCantGetPrimaryReplicaForEnlistedPartition() throws
InterruptedException {
+ UUID tx0 = TestTransactionIds.newTransactionId();
+ TablePartitionId part0 = new TablePartitionId(TABLE_ID, PART_ID);
+ txStateStorage.put(tx0, new TxMeta(TxState.COMMITED, List.of(part0),
null));
+
+ cleanupCallback = (tx, partId) -> completedFuture(null);
+
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture = new
CompletableFuture<>();
+ placementDriver.setAwaitPrimaryReplicaFunction((groupId, timestamp) ->
primaryReplicaFuture);
+
+ PrimaryReplicaEventParameters parameters = new
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE_NAME);
+
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
parameters), willSucceedIn(1, SECONDS));
+
+ assertFalse(txStateStorage.get(tx0).locksReleased());
+
+ Thread primaryReplicaFutureCompleteThread =
+ new Thread(() ->
primaryReplicaFuture.completeExceptionally(new RuntimeException("test
exception")));
+ primaryReplicaFutureCompleteThread.start();
+
+ assertFalse(txStateStorage.get(tx0).locksReleased());
+
+ placementDriver.setAwaitPrimaryReplicaFunction(null);
+
+ primaryReplicaFutureCompleteThread.join();
+
+ assertTrue(txStateStorage.get(tx0).locksReleased());
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 6bc1de3b8e..68b824d48a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -436,7 +436,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
doAnswer(invocation -> txStateMeta).when(txManager).stateMeta(any());
- doAnswer(invocation ->
completedFuture(null)).when(txManager).executeCleanupAsync(any());
+ doAnswer(invocation ->
completedFuture(null)).when(txManager).executeCleanupAsync(any(Runnable.class));
doAnswer(invocation -> {
var resp = new
TxMessagesFactory().txStateResponse().txStateMeta(txStateMeta).build();
@@ -518,7 +518,6 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
((TestSortedIndexStorage) sortedIndexStorage.storage()).clear();
testMvPartitionStorage.clear();
pendingRows.clear();
- //lockManager.locks(txId).forEachRemaining(lock ->
lockManager.release(lock));
}
@Test
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 44a3af2ae1..72c5320612 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -90,6 +91,14 @@ public interface TxManager extends IgniteComponent {
*/
CompletableFuture<Void> executeCleanupAsync(Runnable runnable);
+ /**
+ * Execute transaction cleanup asynchronously.
+ *
+ * @param action Cleanup action.
+ * @return Future that completes once the cleanup action finishes.
+ */
+ CompletableFuture<?> executeCleanupAsync(Supplier<CompletableFuture<?>>
action);
+
/**
* Finishes a one-phase committed transaction. This method doesn't contain
any distributed communication.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
index 8eab25e38a..82df9c3379 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.tx;
-import static java.util.Collections.unmodifiableList;
+import static java.util.Collections.unmodifiableCollection;
-import java.util.List;
+import java.util.Collection;
import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -35,23 +35,48 @@ public class TxMeta implements TransactionMeta {
private final TxState txState;
/** The list of enlisted partitions. */
- private final List<TablePartitionId> enlistedPartitions;
+ private final Collection<TablePartitionId> enlistedPartitions;
/** Commit timestamp. */
@Nullable
private final HybridTimestamp commitTimestamp;
+ /**
+ * Whether the locks are released. It is needed for tx recovery
operations, to guarantee locks release, meanwhile the cleanup
+ * of write intents is not so important for recovery, because write
intents can be resolved at any time while the tx state is known.
+ */
+ private final boolean locksReleased;
+
+ /**
+ * The constructor.
+ *
+ * @param txState Tx state.
+ * @param enlistedPartitions The list of enlisted partitions.
+ * @param commitTimestamp Commit timestamp.
+ */
+ public TxMeta(TxState txState, Collection<TablePartitionId>
enlistedPartitions, @Nullable HybridTimestamp commitTimestamp
+ ) {
+ this(txState, enlistedPartitions, commitTimestamp, false);
+ }
+
/**
* The constructor.
*
* @param txState Tx state.
* @param enlistedPartitions The list of enlisted partitions.
* @param commitTimestamp Commit timestamp.
+ * @param locksReleased Whether the locks are released.
*/
- public TxMeta(TxState txState, List<TablePartitionId> enlistedPartitions,
@Nullable HybridTimestamp commitTimestamp) {
+ public TxMeta(
+ TxState txState,
+ Collection<TablePartitionId> enlistedPartitions,
+ @Nullable HybridTimestamp commitTimestamp,
+ boolean locksReleased
+ ) {
this.txState = txState;
this.enlistedPartitions = enlistedPartitions;
this.commitTimestamp = commitTimestamp;
+ this.locksReleased = locksReleased;
}
@Override
@@ -59,8 +84,8 @@ public class TxMeta implements TransactionMeta {
return txState;
}
- public List<TablePartitionId> enlistedPartitions() {
- return unmodifiableList(enlistedPartitions);
+ public Collection<TablePartitionId> enlistedPartitions() {
+ return unmodifiableCollection(enlistedPartitions);
}
@Override
@@ -68,6 +93,10 @@ public class TxMeta implements TransactionMeta {
return commitTimestamp;
}
+ public boolean locksReleased() {
+ return locksReleased;
+ }
+
@Override
public String toString() {
return S.toString(TxMeta.class, this);
@@ -87,7 +116,8 @@ public class TxMeta implements TransactionMeta {
return txState == other.txState
&& enlistedPartitions.equals(other.enlistedPartitions)
- && Objects.equals(commitTimestamp, other.commitTimestamp);
+ && Objects.equals(commitTimestamp, other.commitTimestamp)
+ && locksReleased == other.locksReleased;
}
@Override
@@ -95,6 +125,7 @@ public class TxMeta implements TransactionMeta {
int result = txState.hashCode();
result = 31 * result + enlistedPartitions.hashCode();
result = 31 * result + (commitTimestamp != null ?
commitTimestamp.hashCode() : 0);
+ result = 31 * result + Boolean.hashCode(locksReleased);
return result;
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index cf5c315a35..422f5da91f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -483,6 +484,11 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
return runAsync(runnable, cleanupExecutor);
}
+ @Override
+ public CompletableFuture<?>
executeCleanupAsync(Supplier<CompletableFuture<?>> action) {
+ return supplyAsync(action, cleanupExecutor).thenCompose(f -> f);
+ }
+
CompletableFuture<Void> completeReadOnlyTransactionFuture(TxIdAndTimestamp
txIdAndTimestamp) {
CompletableFuture<Void> readOnlyTxFuture =
readOnlyTxFutureById.remove(txIdAndTimestamp);
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
index 358c59aa46..63c1b68769 100644
---
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -97,7 +97,7 @@ public abstract class AbstractTxStateStorageTest {
for (int i = 0; i < 100; i++) {
TxMeta txMeta = storage.get(txIds.get(i));
TxMeta txMetaExpected = new TxMeta(TxState.COMMITED,
generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
- assertTxMetaEquals(txMetaExpected, txMeta);
+ assertEquals(txMetaExpected, txMeta);
}
for (int i = 0; i < 100; i++) {
@@ -113,7 +113,7 @@ public abstract class AbstractTxStateStorageTest {
} else {
TxMeta txMeta = storage.get(txIds.get(i));
TxMeta txMetaExpected = new TxMeta(TxState.COMMITED,
generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
- assertTxMetaEquals(txMetaExpected, txMeta);
+ assertEquals(txMetaExpected, txMeta);
}
}
}
@@ -155,7 +155,7 @@ public abstract class AbstractTxStateStorageTest {
TxMeta txMetaNullTimestamp0 = new TxMeta(txMeta1.txState(),
txMeta1.enlistedPartitions(), null);
assertFalse(storage.compareAndSet(txId, TxState.ABORTED,
txMetaNullTimestamp0, 3, 2));
- assertTxMetaEquals(storage.get(txId), txMeta1);
+ assertEquals(storage.get(txId), txMeta1);
assertTrue(storage.compareAndSet(txId, txMeta1.txState(), txMeta2, 3,
2));
// Checking idempotency.
@@ -165,7 +165,7 @@ public abstract class AbstractTxStateStorageTest {
TxMeta txMetaNullTimestamp2 = new TxMeta(txMeta2.txState(),
txMeta2.enlistedPartitions(), null);
assertFalse(storage.compareAndSet(txId, TxState.ABORTED,
txMetaNullTimestamp2, 3, 2));
- assertTxMetaEquals(storage.get(txId), txMeta2);
+ assertEquals(storage.get(txId), txMeta2);
}
@Test
@@ -194,7 +194,7 @@ public abstract class AbstractTxStateStorageTest {
assertNotNull(txMeta);
assertNotNull(txData);
assertNotNull(txData.getValue());
- assertTxMetaEquals(txMeta, txData.getValue());
+ assertEquals(txMeta, txData.getValue());
}
assertTrue(txs.isEmpty());
@@ -474,12 +474,6 @@ public abstract class AbstractTxStateStorageTest {
);
}
- private static void assertTxMetaEquals(TxMeta txMeta0, TxMeta txMeta1) {
- assertEquals(txMeta0.txState(), txMeta1.txState());
- assertEquals(txMeta0.commitTimestamp(), txMeta1.commitTimestamp());
- assertEquals(txMeta0.enlistedPartitions(),
txMeta1.enlistedPartitions());
- }
-
private static void assertThrowsIgniteInternalException(int
expFullErrorCode, Executable executable) {
IgniteInternalException exception =
assertThrows(IgniteInternalException.class, executable);