This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5cadcaab549 IGNITE-27769 Release waiters before cleanup
5cadcaab549 is described below
commit 5cadcaab549f9bb184d71c79d418f5c5f43728fd
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Wed Feb 11 17:10:55 2026 +0300
IGNITE-27769 Release waiters before cleanup
---
.../app/client/ItThinClientTransactionsTest.java | 38 +++++++-
.../replicator/PartitionReplicaListener.java | 13 +++
.../org/apache/ignite/internal/tx/LockManager.java | 8 ++
.../ignite/internal/tx/impl/HeapLockManager.java | 85 +++++++++++++++--
.../internal/tx/AbstractLockManagerTest.java | 101 +++++++++++++++++++++
.../internal/tx/CoarseGrainedLockManagerTest.java | 25 +++++
6 files changed, 260 insertions(+), 10 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index a6479ad869c..4445fe3444a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.runner.app.client;
import static java.util.Collections.emptyList;
import static java.util.Comparator.comparing;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;
@@ -999,7 +1001,7 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
assertEquals(recsBatch.size(), view.getAll(tx1, keys).size());
tx1.commit();
- // Test if we don't stuck in locks in subsequent rw txn.
+ // Test if we don't stuck in locks in subsequent rw txns.
CompletableFuture.runAsync(() -> {
Transaction tx0 = client().transactions().begin();
view.upsert(tx0, keys0.get(0));
@@ -1181,6 +1183,40 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
assertNull(val, "Read-only transaction should not see values committed
after its start");
}
+ @Test
+ public void testRollbackDoesNotBlockOnLockConflict() {
+ ClientTable table = (ClientTable) table();
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+ Map<Partition, ClusterNode> map =
table.partitionDistribution().primaryReplicasAsync().join();
+ List<Tuple> tuples0 = generateKeysForPartition(800, 10, map, 0, table);
+
+ ClientLazyTransaction olderTxProxy = (ClientLazyTransaction)
client().transactions().begin();
+ ClientLazyTransaction youngerTxProxy = (ClientLazyTransaction)
client().transactions().begin();
+
+ Tuple key = tuples0.get(0);
+ Tuple key2 = tuples0.get(1);
+ Tuple val = val("1");
+ Tuple val2 = val("2");
+
+ kvView.put(olderTxProxy, key, val);
+ ClientTransaction olderTx = olderTxProxy.startedTx();
+
+ kvView.put(youngerTxProxy, key2, val2);
+ ClientTransaction youngerTx = youngerTxProxy.startedTx();
+
+ assertTrue(olderTx.txId().compareTo(youngerTx.txId()) < 0);
+
+ // Older is allowed to wait with wait-die.
+ CompletableFuture<Void> fut = kvView.putAsync(olderTxProxy, key2, val);
+ assertFalse(fut.isDone());
+
+ assertThat(olderTxProxy.rollbackAsync(), willSucceedFast());
+
+ // Operation future should be failed.
+ assertThat(fut,
willThrowWithCauseOrSuppressed(TransactionException.class));
+ }
+
@AfterEach
protected void validateInflights() throws NoSuchFieldException {
System.out.println("DBG: validateInflights");
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index baa2aa78718..26872817b0d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1417,6 +1417,19 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
}
private CompletableFuture<ReplicaResult>
processTableWriteIntentSwitchAction(TableWriteIntentSwitchReplicaRequest
request) {
+ TxStateMeta txStateMeta = txManager.stateMeta(request.txId());
+
+ if (txStateMeta != null && txStateMeta.txState() == ABORTED) {
+ // At this point the transaction is marked as finished by
ReplicaTxFinishMarker#markFinished, preventing new locks to appear.
+ // Safe to invalidate waiters, which otherwise will block the
cleanup process.
+ // Using non-retriable exception intentionally to prevent
unnecessary retries.
+ lockManager.failAllWaiters(request.txId(), new
TransactionException(
+ TX_ALREADY_FINISHED_ERR,
+ format("Can't acquire a lock because the transaction is
already finished [{}].",
+ formatTxInfo(request.txId(), txManager))
+ ));
+ }
+
return awaitCleanupReadyFutures(request.txId(), request.commit())
.thenApply(res -> {
if (res.shouldApplyWriteIntent()) {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
index 7360df6856f..04b404a41af 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
@@ -85,6 +85,14 @@ public interface LockManager extends
EventProducer<LockEvent, LockEventParameter
*/
void releaseAll(UUID txId);
+ /**
+ * Fail all waiters with the cause.
+ *
+ * @param txId Tx id.
+ * @param cause The cause.
+ */
+ void failAllWaiters(UUID txId, Exception cause);
+
/**
* Returns a collection of transaction ids that is associated with the
specified {@code key}.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
index 5911ca2f3f9..b9ae7002c0f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.AcquireLockTimeoutException;
import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
import org.apache.ignite.internal.tx.Lock;
-import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
@@ -247,6 +246,17 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
}
}
+ @Override
+ public void failAllWaiters(UUID txId, Exception cause) {
+ ConcurrentLinkedQueue<Releasable> states = this.txMap.get(txId);
+
+ if (states != null) {
+ for (Releasable state : states) {
+ state.tryFail(txId, cause);
+ }
+ }
+ }
+
@Override
public Iterator<Lock> locks() {
return txMap.entrySet().stream()
@@ -407,6 +417,15 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
* @return The type.
*/
boolean coarse();
+
+ /**
+ * Try to fail a waiter.
+ *
+ * @param cause The cause.
+ *
+ * @return {@code True} if successful.
+ */
+ void tryFail(UUID txId, Exception cause);
}
/**
@@ -476,6 +495,29 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
return true;
}
+ @Override
+ public void tryFail(UUID txId, Exception cause) {
+ int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY);
+
+ IgniteBiTuple<Lock, CompletableFuture<Lock>> waiter0 = null;
+
+ stripedLock.readLock(idx).lock();
+
+ try {
+ IgniteBiTuple<Lock, CompletableFuture<Lock>> waiter =
slockWaiters.get(txId);
+
+ if (waiter != null) {
+ waiter0 = waiter;
+ }
+ } finally {
+ stripedLock.readLock(idx).unlock();
+ }
+
+ if (waiter0 != null) {
+ waiter0.get2().completeExceptionally(cause);
+ }
+ }
+
/**
* Acquires a lock.
*
@@ -759,6 +801,26 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
return false;
}
+ @Override
+ public void tryFail(UUID txId, Exception cause) {
+ WaiterImpl waiter0 = null;
+
+ synchronized (waiters) {
+ WaiterImpl waiter = waiters.get(txId);
+
+ // Waiter can be null if it was invalidated by order conflict
resolution logic.
+ // See testFailWaiter3
+ if (waiter != null && waiter.hasLockIntent()) {
+ waiter0 = waiter;
+ waiter.fail(cause);
+ }
+ }
+
+ if (waiter0 != null) {
+ waiter0.notifyLocked();
+ }
+ }
+
/**
* Attempts to acquire a lock for the specified {@code key} in
specified lock mode.
*
@@ -969,7 +1031,11 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
LockMode modeFromDowngrade =
waiter.recalculateMode(lockMode);
if (!waiter.locked() && !waiter.hasLockIntent()) {
- toNotify = release(txId);
+ // All locks are revoked - deqeue waiter.
+ waiters.remove(txId);
+ if (!waiters.isEmpty()) {
+ toNotify = unlockCompatibleWaiters();
+ }
} else if (modeFromDowngrade != waiter.lockMode()) {
toNotify = unlockCompatibleWaiters();
}
@@ -991,9 +1057,10 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
* @return List of waiters to notify.
*/
private List<WaiterImpl> release(UUID txId) {
- waiters.remove(txId);
+ WaiterImpl removed = waiters.remove(txId);
- if (waiters.isEmpty()) {
+ // Removing incomplete waiter doesn't affect lock state.
+ if (removed == null || waiters.isEmpty() || !removed.locked()) {
return emptyList();
}
@@ -1134,9 +1201,9 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
private LockMode lockMode;
/**
- * The filed has a value when the waiter couldn't lock a key.
+ * This field has a value when the waiter couldn't lock a key.
*/
- private LockException ex;
+ private Exception ex;
/**
* The constructor.
@@ -1312,11 +1379,11 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
}
/**
- * Fails the lock waiter.
+ * Fail the waiter with the exception.
*
- * @param e Lock exception.
+ * @param e Exception.
*/
- private void fail(LockException e) {
+ private void fail(Exception e) {
ex = e;
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
index 9cbb04f00f7..85eb9bd881a 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.tx;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.internal.tx.LockMode.IS;
@@ -1125,6 +1126,106 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
assertThat(f, willCompleteSuccessfully());
}
+ @Test
+ public void testFailWaiter() {
+ UUID older = TestTransactionIds.newTransactionId();
+ UUID newer = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(),
X);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey(),
S);
+ assertFalse(fut2.isDone());
+
+ // Should do nothing then called on owner.
+ lockManager.failAllWaiters(newer, new Exception());
+ assertFalse(fut2.isDone());
+
+ lockManager.failAllWaiters(older, new Exception("test"));
+ assertThat(fut2, willThrowWithCauseOrSuppressed(Exception.class,
"test"));
+
+ lockManager.releaseAll(older);
+ lockManager.releaseAll(newer);
+ }
+
+ @Test
+ public void testFailWaiter2() {
+ UUID tx1 = TestTransactionIds.newTransactionId();
+ UUID tx2 = TestTransactionIds.newTransactionId();
+ UUID tx3 = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(tx1, lockKey(), S);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(tx2, lockKey(), S);
+ assertTrue(fut2.isDone());
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(tx3, lockKey(), S);
+ assertTrue(fut3.isDone());
+
+ CompletableFuture<Lock> fut4 = lockManager.acquire(tx2, lockKey(), X);
+ assertFalse(fut4.isDone());
+
+ lockManager.releaseAll(tx3);
+
+ assertThat(fut4,
willThrowWithCauseOrSuppressed(PossibleDeadlockOnLockAcquireException.class));
+ // Failing already invalidated waiter should do nothing.
+ lockManager.failAllWaiters(tx2, new Exception());
+
+ lockManager.releaseAll(tx2);
+ lockManager.releaseAll(tx1);
+ }
+
+ @Test
+ public void testFailWaiter3() {
+ UUID tx1 = TestTransactionIds.newTransactionId();
+ UUID tx2 = TestTransactionIds.newTransactionId();
+ UUID tx3 = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(tx3, lockKey(), S);
+ assertTrue(fut3.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(tx2, lockKey(), X);
+ assertFalse(fut2.isDone());
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(tx1, lockKey(), S);
+ assertTrue(fut1.isDone());
+
+ lockManager.releaseAll(tx3);
+
+ assertThat(fut2,
willThrowWithCauseOrSuppressed(PossibleDeadlockOnLockAcquireException.class));
+ // Failing already invalidated waiter should do nothing.
+ lockManager.failAllWaiters(tx2, new Exception());
+
+ lockManager.releaseAll(tx2);
+ lockManager.releaseAll(tx1);
+ }
+
+ @Test
+ public void testFailWaiter4() {
+ UUID tx1 = TestTransactionIds.newTransactionId();
+ UUID tx2 = TestTransactionIds.newTransactionId();
+ UUID tx3 = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(tx3, lockKey(), S);
+ assertTrue(fut3.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(tx2, lockKey(), X);
+ assertFalse(fut2.isDone());
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(tx1, lockKey(), X);
+ assertFalse(fut1.isDone());
+
+ lockManager.failAllWaiters(tx2, new Exception("test"));
+ assertThat(fut2, willThrowWithCauseOrSuppressed(Exception.class,
"test"));
+ assertFalse(fut1.isDone());
+
+ lockManager.releaseAll(tx3);
+ assertThat(fut1, willCompleteSuccessfully());
+
+ lockManager.releaseAll(tx1);
+ }
+
/**
* Do test single key multithreaded.
*
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
index 68f2f959874..470b66589fa 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.tx;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -374,6 +376,29 @@ public class CoarseGrainedLockManagerTest extends
BaseIgniteAbstractTest {
lockManager.releaseAll(newer);
}
+ @Test
+ public void testFailWaiter() {
+ UUID older = TestTransactionIds.newTransactionId();
+ UUID newer = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ // Currently only S locks are allowed to wait.
+ CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey(),
LockMode.S);
+ assertFalse(fut2.isDone());
+
+ // Should do nothing.
+ lockManager.failAllWaiters(newer, new Exception());
+ assertFalse(fut2.isDone());
+
+ lockManager.failAllWaiters(older, new Exception("test"));
+ assertThat(fut2, willThrowWithCauseOrSuppressed(Exception.class,
"test"));
+
+ lockManager.releaseAll(older);
+ lockManager.releaseAll(newer);
+ }
+
private static LockKey lockKey() {
return new LockKey("test");
}