This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cadcaab549 IGNITE-27769 Release waiters before cleanup
5cadcaab549 is described below

commit 5cadcaab549f9bb184d71c79d418f5c5f43728fd
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Wed Feb 11 17:10:55 2026 +0300

    IGNITE-27769 Release waiters before cleanup
---
 .../app/client/ItThinClientTransactionsTest.java   |  38 +++++++-
 .../replicator/PartitionReplicaListener.java       |  13 +++
 .../org/apache/ignite/internal/tx/LockManager.java |   8 ++
 .../ignite/internal/tx/impl/HeapLockManager.java   |  85 +++++++++++++++--
 .../internal/tx/AbstractLockManagerTest.java       | 101 +++++++++++++++++++++
 .../internal/tx/CoarseGrainedLockManagerTest.java  |  25 +++++
 6 files changed, 260 insertions(+), 10 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index a6479ad869c..4445fe3444a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.runner.app.client;
 import static java.util.Collections.emptyList;
 import static java.util.Comparator.comparing;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.startsWith;
@@ -999,7 +1001,7 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         assertEquals(recsBatch.size(), view.getAll(tx1, keys).size());
         tx1.commit();
 
-        // Test if we don't stuck in locks in subsequent rw txn.
+        // Test if we don't stuck in locks in subsequent rw txns.
         CompletableFuture.runAsync(() -> {
             Transaction tx0 = client().transactions().begin();
             view.upsert(tx0, keys0.get(0));
@@ -1181,6 +1183,40 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         assertNull(val, "Read-only transaction should not see values committed 
after its start");
     }
 
+    @Test
+    public void testRollbackDoesNotBlockOnLockConflict() {
+        ClientTable table = (ClientTable) table();
+        KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+        Map<Partition, ClusterNode> map = 
table.partitionDistribution().primaryReplicasAsync().join();
+        List<Tuple> tuples0 = generateKeysForPartition(800, 10, map, 0, table);
+
+        ClientLazyTransaction olderTxProxy = (ClientLazyTransaction) 
client().transactions().begin();
+        ClientLazyTransaction youngerTxProxy = (ClientLazyTransaction) 
client().transactions().begin();
+
+        Tuple key = tuples0.get(0);
+        Tuple key2 = tuples0.get(1);
+        Tuple val = val("1");
+        Tuple val2 = val("2");
+
+        kvView.put(olderTxProxy, key, val);
+        ClientTransaction olderTx = olderTxProxy.startedTx();
+
+        kvView.put(youngerTxProxy, key2, val2);
+        ClientTransaction youngerTx = youngerTxProxy.startedTx();
+
+        assertTrue(olderTx.txId().compareTo(youngerTx.txId()) < 0);
+
+        // Older is allowed to wait with wait-die.
+        CompletableFuture<Void> fut = kvView.putAsync(olderTxProxy, key2, val);
+        assertFalse(fut.isDone());
+
+        assertThat(olderTxProxy.rollbackAsync(), willSucceedFast());
+
+        // Operation future should be failed.
+        assertThat(fut, 
willThrowWithCauseOrSuppressed(TransactionException.class));
+    }
+
     @AfterEach
     protected void validateInflights() throws NoSuchFieldException {
         System.out.println("DBG: validateInflights");
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index baa2aa78718..26872817b0d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1417,6 +1417,19 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
     }
 
     private CompletableFuture<ReplicaResult> 
processTableWriteIntentSwitchAction(TableWriteIntentSwitchReplicaRequest 
request) {
+        TxStateMeta txStateMeta = txManager.stateMeta(request.txId());
+
+        if (txStateMeta != null && txStateMeta.txState() == ABORTED) {
+            // At this point the transaction is marked as finished by 
ReplicaTxFinishMarker#markFinished, preventing new locks to appear.
+            // Safe to invalidate waiters, which otherwise will block the 
cleanup process.
+            // Using non-retriable exception intentionally to prevent 
unnecessary retries.
+            lockManager.failAllWaiters(request.txId(), new 
TransactionException(
+                    TX_ALREADY_FINISHED_ERR,
+                    format("Can't acquire a lock because the transaction is 
already finished [{}].",
+                            formatTxInfo(request.txId(), txManager))
+            ));
+        }
+
         return awaitCleanupReadyFutures(request.txId(), request.commit())
                 .thenApply(res -> {
                     if (res.shouldApplyWriteIntent()) {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
index 7360df6856f..04b404a41af 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
@@ -85,6 +85,14 @@ public interface LockManager extends 
EventProducer<LockEvent, LockEventParameter
      */
     void releaseAll(UUID txId);
 
+    /**
+     * Fail all waiters with the cause.
+     *
+     * @param txId Tx id.
+     * @param cause The cause.
+     */
+    void failAllWaiters(UUID txId, Exception cause);
+
     /**
      * Returns a collection of transaction ids that is associated with the 
specified {@code key}.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
index 5911ca2f3f9..b9ae7002c0f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
@@ -52,7 +52,6 @@ import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.AcquireLockTimeoutException;
 import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
 import org.apache.ignite.internal.tx.Lock;
-import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.LockKey;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.LockMode;
@@ -247,6 +246,17 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
         }
     }
 
+    @Override
+    public void failAllWaiters(UUID txId, Exception cause) {
+        ConcurrentLinkedQueue<Releasable> states = this.txMap.get(txId);
+
+        if (states != null) {
+            for (Releasable state : states) {
+                state.tryFail(txId, cause);
+            }
+        }
+    }
+
     @Override
     public Iterator<Lock> locks() {
         return txMap.entrySet().stream()
@@ -407,6 +417,15 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
          * @return The type.
          */
         boolean coarse();
+
+        /**
+         * Try to fail a waiter.
+         *
+         * @param cause The cause.
+         *
+         * @return {@code True} if successful.
+         */
+        void tryFail(UUID txId, Exception cause);
     }
 
     /**
@@ -476,6 +495,29 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
             return true;
         }
 
+        @Override
+        public void tryFail(UUID txId, Exception cause) {
+            int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY);
+
+            IgniteBiTuple<Lock, CompletableFuture<Lock>> waiter0 = null;
+
+            stripedLock.readLock(idx).lock();
+
+            try {
+                IgniteBiTuple<Lock, CompletableFuture<Lock>> waiter = 
slockWaiters.get(txId);
+
+                if (waiter != null) {
+                    waiter0 = waiter;
+                }
+            } finally {
+                stripedLock.readLock(idx).unlock();
+            }
+
+            if (waiter0 != null) {
+                waiter0.get2().completeExceptionally(cause);
+            }
+        }
+
         /**
          * Acquires a lock.
          *
@@ -759,6 +801,26 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
             return false;
         }
 
+        @Override
+        public void tryFail(UUID txId, Exception cause) {
+            WaiterImpl waiter0 = null;
+
+            synchronized (waiters) {
+                WaiterImpl waiter = waiters.get(txId);
+
+                // Waiter can be null if it was invalidated by order conflict 
resolution logic.
+                // See testFailWaiter3
+                if (waiter != null && waiter.hasLockIntent()) {
+                    waiter0 = waiter;
+                    waiter.fail(cause);
+                }
+            }
+
+            if (waiter0 != null) {
+                waiter0.notifyLocked();
+            }
+        }
+
         /**
          * Attempts to acquire a lock for the specified {@code key} in 
specified lock mode.
          *
@@ -969,7 +1031,11 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
                     LockMode modeFromDowngrade = 
waiter.recalculateMode(lockMode);
 
                     if (!waiter.locked() && !waiter.hasLockIntent()) {
-                        toNotify = release(txId);
+                        // All locks are revoked - deqeue waiter.
+                        waiters.remove(txId);
+                        if (!waiters.isEmpty()) {
+                            toNotify = unlockCompatibleWaiters();
+                        }
                     } else if (modeFromDowngrade != waiter.lockMode()) {
                         toNotify = unlockCompatibleWaiters();
                     }
@@ -991,9 +1057,10 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
          * @return List of waiters to notify.
          */
         private List<WaiterImpl> release(UUID txId) {
-            waiters.remove(txId);
+            WaiterImpl removed = waiters.remove(txId);
 
-            if (waiters.isEmpty()) {
+            // Removing incomplete waiter doesn't affect lock state.
+            if (removed == null || waiters.isEmpty() || !removed.locked()) {
                 return emptyList();
             }
 
@@ -1134,9 +1201,9 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
         private LockMode lockMode;
 
         /**
-         * The filed has a value when the waiter couldn't lock a key.
+         * This field has a value when the waiter couldn't lock a key.
          */
-        private LockException ex;
+        private Exception ex;
 
         /**
          * The constructor.
@@ -1312,11 +1379,11 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
         }
 
         /**
-         * Fails the lock waiter.
+         * Fail the waiter with the exception.
          *
-         * @param e Lock exception.
+         * @param e Exception.
          */
-        private void fail(LockException e) {
+        private void fail(Exception e) {
             ex = e;
         }
 
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
index 9cbb04f00f7..85eb9bd881a 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.tx;
 
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.apache.ignite.internal.tx.LockMode.IS;
@@ -1125,6 +1126,106 @@ public abstract class AbstractLockManagerTest extends 
IgniteAbstractTest {
         assertThat(f, willCompleteSuccessfully());
     }
 
+    @Test
+    public void testFailWaiter() {
+        UUID older = TestTransactionIds.newTransactionId();
+        UUID newer = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(), 
X);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey(), 
S);
+        assertFalse(fut2.isDone());
+
+        // Should do nothing then called on owner.
+        lockManager.failAllWaiters(newer, new Exception());
+        assertFalse(fut2.isDone());
+
+        lockManager.failAllWaiters(older, new Exception("test"));
+        assertThat(fut2, willThrowWithCauseOrSuppressed(Exception.class, 
"test"));
+
+        lockManager.releaseAll(older);
+        lockManager.releaseAll(newer);
+    }
+
+    @Test
+    public void testFailWaiter2() {
+        UUID tx1 = TestTransactionIds.newTransactionId();
+        UUID tx2 = TestTransactionIds.newTransactionId();
+        UUID tx3 = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(tx1, lockKey(), S);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(tx2, lockKey(), S);
+        assertTrue(fut2.isDone());
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(tx3, lockKey(), S);
+        assertTrue(fut3.isDone());
+
+        CompletableFuture<Lock> fut4 = lockManager.acquire(tx2, lockKey(), X);
+        assertFalse(fut4.isDone());
+
+        lockManager.releaseAll(tx3);
+
+        assertThat(fut4, 
willThrowWithCauseOrSuppressed(PossibleDeadlockOnLockAcquireException.class));
+        // Failing already invalidated waiter should do nothing.
+        lockManager.failAllWaiters(tx2, new Exception());
+
+        lockManager.releaseAll(tx2);
+        lockManager.releaseAll(tx1);
+    }
+
+    @Test
+    public void testFailWaiter3() {
+        UUID tx1 = TestTransactionIds.newTransactionId();
+        UUID tx2 = TestTransactionIds.newTransactionId();
+        UUID tx3 = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(tx3, lockKey(), S);
+        assertTrue(fut3.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(tx2, lockKey(), X);
+        assertFalse(fut2.isDone());
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(tx1, lockKey(), S);
+        assertTrue(fut1.isDone());
+
+        lockManager.releaseAll(tx3);
+
+        assertThat(fut2, 
willThrowWithCauseOrSuppressed(PossibleDeadlockOnLockAcquireException.class));
+        // Failing already invalidated waiter should do nothing.
+        lockManager.failAllWaiters(tx2, new Exception());
+
+        lockManager.releaseAll(tx2);
+        lockManager.releaseAll(tx1);
+    }
+
+    @Test
+    public void testFailWaiter4() {
+        UUID tx1 = TestTransactionIds.newTransactionId();
+        UUID tx2 = TestTransactionIds.newTransactionId();
+        UUID tx3 = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(tx3, lockKey(), S);
+        assertTrue(fut3.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(tx2, lockKey(), X);
+        assertFalse(fut2.isDone());
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(tx1, lockKey(), X);
+        assertFalse(fut1.isDone());
+
+        lockManager.failAllWaiters(tx2, new Exception("test"));
+        assertThat(fut2, willThrowWithCauseOrSuppressed(Exception.class, 
"test"));
+        assertFalse(fut1.isDone());
+
+        lockManager.releaseAll(tx3);
+        assertThat(fut1, willCompleteSuccessfully());
+
+        lockManager.releaseAll(tx1);
+    }
+
     /**
      * Do test single key multithreaded.
      *
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
index 68f2f959874..470b66589fa 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.tx;
 
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -374,6 +376,29 @@ public class CoarseGrainedLockManagerTest extends 
BaseIgniteAbstractTest {
         lockManager.releaseAll(newer);
     }
 
+    @Test
+    public void testFailWaiter() {
+        UUID older = TestTransactionIds.newTransactionId();
+        UUID newer = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        // Currently only S locks are allowed to wait.
+        CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey(), 
LockMode.S);
+        assertFalse(fut2.isDone());
+
+        // Should do nothing.
+        lockManager.failAllWaiters(newer, new Exception());
+        assertFalse(fut2.isDone());
+
+        lockManager.failAllWaiters(older, new Exception("test"));
+        assertThat(fut2, willThrowWithCauseOrSuppressed(Exception.class, 
"test"));
+
+        lockManager.releaseAll(older);
+        lockManager.releaseAll(newer);
+    }
+
     private static LockKey lockKey() {
         return new LockKey("test");
     }

Reply via email to