This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7c2722a57d IGNITE-18052 Introduced short term locks for sorted indexes
operations (#1342)
7c2722a57d is described below
commit 7c2722a57dfda2d5793b082e9db5ec6b6709fe67
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Wed Nov 16 00:39:39 2022 +0300
IGNITE-18052 Introduced short term locks for sorted indexes operations
(#1342)
---
.../table/distributed/SortedIndexLocker.java | 1 +
.../org/apache/ignite/internal/tx/LockManager.java | 10 +-
.../ignite/internal/tx/impl/HeapLockManager.java | 290 +++++++++++++++------
.../internal/tx/AbstractLockManagerTest.java | 130 +++++----
4 files changed, 302 insertions(+), 129 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
index 22520aa877..ec11fec2f9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
@@ -133,6 +133,7 @@ public class SortedIndexLocker implements IndexLocker {
@Override
public CompletableFuture<?> locksForInsert(UUID txId, BinaryRow tableRow,
RowId rowId) {
BinaryTuple key = indexRowResolver.apply(tableRow);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18165
// BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(key);
// find next key
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
index cce6d34a15..6cb72723b1 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java
@@ -45,13 +45,13 @@ public interface LockManager {
public void release(Lock lock);
/**
- * Attempts to downgrade a lock mode for the specified {@code lockKey}.
+ * Release a lock that holds on a specific mode.
*
- * @param lock Lock to downgrade.
- * @param lockMode Lock mode.
- * @throws LockException If the downgrade operation is invalid.
+ * @param txId Transaction id.
+ * @param lockKey The key.
+ * @param lockMode Lock mode, for example shared, exclusive,
intention-shared etc.
*/
- public void downgrade(Lock lock, LockMode lockMode) throws LockException;
+ void release(UUID txId, LockKey lockKey, LockMode lockMode);
/**
* Retrieves all locks for the specified transaction id.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
index 3aa4020545..30f75f3263 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
-import static
org.apache.ignite.lang.ErrorGroups.Transactions.DOWNGRADE_LOCK_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.RELEASE_LOCK_ERR;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -109,10 +110,12 @@ public class HeapLockManager implements LockManager {
}
@Override
- public void downgrade(Lock lock, LockMode lockMode) throws LockException {
- LockState state = lockState(lock.lockKey());
+ public void release(UUID txId, LockKey lockKey, LockMode lockMode) {
+ LockState state = lockState(lockKey);
- state.tryDowngrade(lock, lockMode);
+ if (state.tryRelease(txId, lockMode)) {
+ locks.remove(lockKey, state);
+ }
}
@Override
@@ -190,8 +193,12 @@ public class HeapLockManager implements LockManager {
// Reenter
if (prev != null && prev.locked) {
if (prev.lockMode.allowReenter(lockMode)) {
+ prev.addLock(lockMode, 1);
+
return new
IgniteBiTuple(CompletableFuture.completedFuture(null), lockMode);
} else {
+ waiter.addLocks(prev.locks);
+
waiter.upgraded = true;
lockMode = LockMode.supremum(prev.lockMode, lockMode);
@@ -262,109 +269,161 @@ public class HeapLockManager implements LockManager {
* @return {@code True} if the queue is empty.
*/
public boolean tryRelease(UUID txId) {
- Collection<WaiterImpl> locked = new ArrayList<>();
- Collection<WaiterImpl> toFail = new ArrayList<>();
-
- WaiterImpl removed;
+ Collection<WaiterImpl> toNotify;
synchronized (waiters) {
- removed = waiters.remove(txId);
-
- markedForRemove = waiters.isEmpty();
-
- if (markedForRemove) {
- return true;
- }
-
- Set<LockMode> lockModes = new HashSet<>();
-
- // Grant lock to all adjacent readers.
- for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
- WaiterImpl tmp = entry.getValue();
+ toNotify = release(txId);
+ }
- if (tmp.upgraded &&
!removed.lockMode.isCompatible(tmp.prevLockMode)) {
- // Fail upgraded waiters.
- assert !tmp.locked;
+ // Notify outside the monitor.
+ for (WaiterImpl waiter : toNotify) {
+ waiter.notifyLocked();
+ }
- // Downgrade to acquired lock.
- tmp.upgraded = false;
- tmp.lockMode = tmp.prevLockMode;
- tmp.prevLockMode = null;
- tmp.locked = true;
+ return markedForRemove;
+ }
- toFail.add(tmp);
- } else if
(lockModes.stream().allMatch(tmp.lockMode::isCompatible)) {
- if (tmp.upgraded) {
- // Fail upgraded waiters.
- assert !tmp.locked;
+ /**
+ * Releases a specific lock of the key.
+ *
+ * @param txId Transaction id.
+ * @param lockMode Lock mode.
+ * @return If the value is true, no one waits of any lock of the key,
false otherwise.
+ */
+ public boolean tryRelease(UUID txId, LockMode lockMode) {
+ List<WaiterImpl> toNotify = Collections.emptyList();
+ synchronized (waiters) {
+ WaiterImpl waiter = waiters.get(txId);
- // Upgrade lock.
- tmp.upgraded = false;
- tmp.prevLockMode = null;
- tmp.locked = true;
- } else {
- tmp.lock();
- }
+ if (waiter != null) {
+ waiter.removeLock(lockMode);
- lockModes.add(tmp.lockMode);
+ LockMode modeToDowngrade = waiter.recalculateMode();
- locked.add(tmp);
+ if (modeToDowngrade == null) {
+ toNotify = release(txId);
+ } else {
+ toNotify = downgrade(txId, modeToDowngrade);
}
}
}
// Notify outside the monitor.
- for (WaiterImpl waiter : locked) {
+ for (WaiterImpl waiter : toNotify) {
waiter.notifyLocked();
}
- for (WaiterImpl waiter : toFail) {
- waiter.fut.completeExceptionally(
- new LockException(
- RELEASE_LOCK_ERR,
- "Failed to acquire a lock due to a conflict
[txId=" + txId + ", waiter=" + removed + ']'));
+ return markedForRemove;
+ }
+
+ /**
+ * Releases all locks are held by a specific transaction.
+ * This method should be invoked synchronously.
+ *
+ * @param txId Transaction id.
+ * @return List of waiters to notify.
+ */
+ private List<WaiterImpl> release(UUID txId) {
+ WaiterImpl removed = waiters.remove(txId);
+
+ if (waiters.isEmpty()) {
+ markedForRemove = true;
+
+ return Collections.emptyList();
}
- return false;
+ List<WaiterImpl> toNotify = unlockCompatibleWaiters(txId, removed,
null);
+
+ return toNotify;
}
/**
- * Attempts to downgrade a lock for the specified {@code key} to a
specified lock mode.
+ * Unlock compatible waiters.
*
- * @param lock Lock.
- * @param lockMode Lock mode.
- * @throws LockException If the downgrade operation is invalid.
+ * @param txId Transaction id.
+ * @param pickedUpWaiter List of unlocked waiters.
+ * @param downgradeMode Lock mode to downgrade.
+ * @return List of waiters to notify.
*/
- void tryDowngrade(Lock lock, LockMode lockMode) throws LockException {
- WaiterImpl waiter = new WaiterImpl(lock.txId(), lockMode);
+ private ArrayList<WaiterImpl> unlockCompatibleWaiters(UUID txId,
WaiterImpl pickedUpWaiter, LockMode downgradeMode) {
+ ArrayList<WaiterImpl> toNotify = new ArrayList<>();
+ Set<LockMode> lockModes = new HashSet<>();
- synchronized (waiters) {
- WaiterImpl prev = waiters.remove(lock.txId());
+ if (downgradeMode != null) {
+ lockModes.add(downgradeMode);
+ }
- if (prev != null) {
- if (prev.lockMode == LockMode.IX && lockMode == LockMode.S
- || prev.lockMode == LockMode.S && lockMode ==
LockMode.IX
- || prev.lockMode.compareTo(lockMode) < 0) {
- waiters.put(lock.txId(), prev);
+ // Grant lock to all adjacent readers.
+ for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
+ WaiterImpl tmp = entry.getValue();
- throw new LockException(DOWNGRADE_LOCK_ERR, "Cannot
change lock mode from " + prev.lockMode + " to " + lockMode);
- }
+ if (tmp.upgraded &&
!pickedUpWaiter.lockMode.isCompatible(tmp.prevLockMode)) {
+ // Fail upgraded waiters.
+ assert !tmp.locked;
- for (Map.Entry<UUID, WaiterImpl> entry :
waiters.entrySet()) {
- WaiterImpl tmp = entry.getValue();
+ // Downgrade to acquired lock.
+ tmp.upgraded = false;
+ tmp.lockMode = tmp.prevLockMode;
+ tmp.prevLockMode = null;
+ tmp.locked = true;
- if (!lockMode.isCompatible(tmp.lockMode)) {
- waiters.put(lock.txId(), waiter);
+ tmp.fail(new LockException(RELEASE_LOCK_ERR,
+ "Failed to acquire a lock due to a conflict
[txId=" + txId + ", waiter=" + pickedUpWaiter + ']'));
- throw new LockException(
- DOWNGRADE_LOCK_ERR,
- "Cannot change lock mode from " +
prev.lockMode + " to " + lockMode);
- }
+ toNotify.add(tmp);
+ } else if
(lockModes.stream().allMatch(tmp.lockMode::isCompatible)) {
+ if (tmp.upgraded) {
+ // Fail upgraded waiters.
+ assert !tmp.locked;
+
+ // Upgrade lock.
+ tmp.upgraded = false;
+ tmp.prevLockMode = null;
+ tmp.locked = true;
+ } else {
+ tmp.lock();
}
- waiters.put(lock.txId(), waiter);
+ lockModes.add(tmp.lockMode);
+
+ toNotify.add(tmp);
}
}
+
+ return toNotify;
+ }
+
+ /**
+ * Downgrades a lock on a specific key.
+ * This method should be invoked synchronously.
+ *
+ * @param txId Transaction id.
+ * @param lockMode Lock mode.
+ * @return List of waiters to notify.
+ */
+ private List<WaiterImpl> downgrade(UUID txId, LockMode lockMode) {
+ WaiterImpl waiter = waiters.remove(txId);
+
+ if (waiter == null || waiter.lockMode == lockMode) {
+ waiters.put(txId, waiter);
+
+ return Collections.emptyList();
+ }
+
+ assert waiter.lockMode != LockMode.S || lockMode != LockMode.IX :
+ "Cannot change lock [from=" + waiter.lockMode + ", to=" +
lockMode + ']';
+
+ assert waiter.lockMode.compareTo(lockMode) > 0 :
+ "Held lock mode have to be more strict than mode to
downgrade [from=" + waiter.lockMode + ", to=" + lockMode
+ + ']';
+
+ List<WaiterImpl> toNotify = unlockCompatibleWaiters(txId, waiter,
lockMode);
+
+ waiter.lockMode = lockMode;
+
+ waiters.put(txId, waiter);
+
+ return toNotify;
}
/**
@@ -395,6 +454,10 @@ public class HeapLockManager implements LockManager {
* A waiter implementation.
*/
private static class WaiterImpl implements Comparable<WaiterImpl>, Waiter {
+
+ /** Holding locks by type. */
+ private final Map<LockMode, Integer> locks = new HashMap<>();
+
/** Locked future. */
@IgniteToStringExclude
private final CompletableFuture<Void> fut;
@@ -414,6 +477,11 @@ public class HeapLockManager implements LockManager {
/** The state. */
private boolean locked = false;
+ /**
+ * The filed has a value when the waiter couldn't lock a key.
+ */
+ private LockException ex;
+
/**
* The constructor.
*
@@ -424,6 +492,63 @@ public class HeapLockManager implements LockManager {
this.fut = new CompletableFuture<>();
this.txId = txId;
this.lockMode = lockMode;
+
+ locks.put(lockMode, 1);
+ }
+
+ /**
+ * Adds a lock mode.
+ *
+ * @param lockMode Lock mode.
+ * @param increment Value to increment amount.
+ */
+ void addLock(LockMode lockMode, int increment) {
+ locks.merge(lockMode, increment, Integer::sum);
+ }
+
+ /**
+ * Removes a lock mode.
+ *
+ * @param lockMode Lock mode.
+ */
+ void removeLock(LockMode lockMode) {
+ Integer counter = locks.get(lockMode);
+
+ if (counter == null || counter < 2) {
+ locks.remove(lockMode);
+ } else {
+ locks.put(lockMode, counter - 1);
+ }
+ }
+
+ /**
+ * Recalculates lock mode based of all locks which the waiter has took.
+ *
+ * @return Recalculated lock mode.
+ */
+ LockMode recalculateMode() {
+ LockMode mode = null;
+
+ for (LockMode heldMode : locks.keySet()) {
+ assert locks.get(heldMode) > 0 : "Incorrect lock counter
[txId=" + txId + ", mode=" + heldMode + "]";
+
+ mode = mode == null ? heldMode : LockMode.supremum(mode,
heldMode);
+ }
+
+ return mode;
+ }
+
+ /**
+ * Adds several locks modes to the waiter.
+ *
+ * @param locksToAdd Map with lock modes.
+ */
+ void addLocks(Map<LockMode, Integer> locksToAdd) {
+ for (LockMode mode : locksToAdd.keySet()) {
+ Integer inc = locksToAdd.get(mode);
+
+ addLock(mode, inc);
+ }
}
/** {@inheritDoc} */
@@ -434,9 +559,13 @@ public class HeapLockManager implements LockManager {
/** Notifies a future listeners. */
private void notifyLocked() {
- assert locked;
+ if (ex != null) {
+ fut.completeExceptionally(ex);
+ } else {
+ assert locked;
- fut.complete(null);
+ fut.complete(null);
+ }
}
/** {@inheritDoc} */
@@ -456,6 +585,15 @@ public class HeapLockManager implements LockManager {
locked = true;
}
+ /**
+ * Fails the lock waiter.
+ *
+ * @param e Lock exception.
+ */
+ private void fail(LockException e) {
+ ex = e;
+ }
+
/** {@inheritDoc} */
@Override
public UUID txId() {
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
index 5227a9475a..00b9427258 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
@@ -25,6 +25,7 @@ import static org.apache.ignite.internal.tx.LockMode.X;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -687,7 +688,7 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
}
@Test
- public void testPossibleDowngradeLockModes() throws Exception {
+ public void testPossibleDowngradeLockModes() {
UUID txId0 = Timestamp.nextVersion().toUuid();
LockKey key = new LockKey("test");
@@ -701,13 +702,15 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
LockMode lastLockMode;
for (LockMode lockMode : lockModes) {
+ var lockFut = lockManager.acquire(txId0, key, lockMode);
+
Waiter waiter = lockManager.waiter(fut0.join().lockKey(), txId0);
lastLockMode = waiter.lockMode();
assertEquals(lastLockMode, waiter.lockMode());
- lockManager.downgrade(fut0.join(), lockMode);
+ lockManager.release(txId0, key, X);
assertTrue(lockManager.queue(key).size() == 1);
@@ -716,10 +719,9 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
assertEquals(lockMode, waiter.lockMode());
assertTrue(lockManager.queue(key).size() == 1);
- }
-
- lockManager.release(fut0.join());
+ lockManager.release(lockFut.join());
+ }
fut0 = lockManager.acquire(txId0, key, X);
@@ -728,13 +730,15 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
lockModes = List.of(SIX, IX, IS);
for (LockMode lockMode : lockModes) {
+ var lockFut = lockManager.acquire(txId0, key, lockMode);
+
Waiter waiter = lockManager.waiter(fut0.join().lockKey(), txId0);
lastLockMode = waiter.lockMode();
assertEquals(lastLockMode, waiter.lockMode());
- lockManager.downgrade(fut0.join(), lockMode);
+ lockManager.release(txId0, key, X);
assertTrue(lockManager.queue(key).size() == 1);
@@ -743,81 +747,111 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
assertEquals(lockMode, waiter.lockMode());
assertTrue(lockManager.queue(key).size() == 1);
- }
- lockManager.release(fut0.join());
+ lockManager.release(lockFut.join());
+ }
}
@Test
- public void testImpossibleDowngradeLockModes1() {
- UUID txId0 = Timestamp.nextVersion().toUuid();
-
+ public void testAcquireRelease() {
+ UUID txId = Timestamp.nextVersion().toUuid();
LockKey key = new LockKey("test");
- List<List<LockMode>> lockModes = new ArrayList<>();
+ for (LockMode lockMode : LockMode.values()) {
+ lockManager.acquire(txId, key, lockMode);
+ lockManager.release(txId, key, lockMode);
- lockModes.add(List.of(S, IX));
- lockModes.add(List.of(IX, S));
+ assertFalse(lockManager.locks(txId).hasNext());
+ }
- for (List<LockMode> lockModes0 : lockModes) {
- CompletableFuture<Lock> fut = lockManager.acquire(txId0, key,
lockModes0.get(0));
+ assertTrue(lockManager.isEmpty());
+ }
+
+ @Test
+ public void testAcquireReleaseWhenHoldOther() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+ LockKey key = new LockKey("test");
- try {
- lockManager.downgrade(fut.join(), lockModes0.get(1));
+ for (LockMode holdLockMode : LockMode.values()) {
+ lockManager.acquire(txId, key, holdLockMode);
- fail();
- } catch (LockException e) {
- // Expected.
+ assertTrue(lockManager.locks(txId).hasNext());
+ assertSame(holdLockMode,
lockManager.locks(txId).next().lockMode());
+
+ for (LockMode lockMode : LockMode.values()) {
+ lockManager.acquire(txId, key, lockMode);
+ lockManager.release(txId, key, lockMode);
}
- assertEquals(1, lockManager.queue(key).size());
- assertEquals(lockModes0.get(0),
lockManager.waiter(fut.join().lockKey(), txId0).lockMode());
+ assertTrue(lockManager.locks(txId).hasNext());
+ assertSame(holdLockMode,
lockManager.locks(txId).next().lockMode());
+
+ lockManager.release(txId, key, holdLockMode);
- lockManager.release(fut.join());
+ assertFalse(lockManager.locks(txId).hasNext());
}
+ assertTrue(lockManager.isEmpty());
}
@Test
- public void testImpossibleDowngradeLockModes2() {
- UUID txId0 = Timestamp.nextVersion().toUuid();
-
+ public void testReleaseThenReleaseWeakerInHierarchy() {
LockKey key = new LockKey("test");
- CompletableFuture<Lock> fut = lockManager.acquire(txId0, key, IS);
+ UUID txId1 = Timestamp.nextVersion().toUuid();
+ UUID txId2 = Timestamp.nextVersion().toUuid();
- try {
- lockManager.downgrade(fut.join(), X);
+ var tx1SharedLock = lockManager.acquire(txId1, key, S);
- fail();
- } catch (LockException e) {
- // Expected.
- }
+ assertTrue(tx1SharedLock.isDone());
- assertEquals(1, lockManager.queue(key).size());
+ var tx1ExclusiveLock = lockManager.acquire(txId1, key, X);
- lockManager.release(fut.join());
+ assertTrue(tx1ExclusiveLock.isDone());
+
+ var tx2SharedLock = lockManager.acquire(txId2, key, S);
+
+ assertFalse(tx2SharedLock.isDone());
+
+ lockManager.release(txId1, key, X);
+
+ assertTrue(lockManager.locks(txId1).hasNext());
+
+ var lock = lockManager.locks(txId1).next();
+
+ assertSame(S, lock.lockMode());
+
+ assertTrue(tx2SharedLock.isDone());
}
@Test
- public void testImpossibleDowngradeLockModes3() {
- UUID txId0 = Timestamp.nextVersion().toUuid();
+ public void testReleaseThenNoReleaseWeakerInHierarchy() {
+ LockKey key = new LockKey("test");
+
UUID txId1 = Timestamp.nextVersion().toUuid();
+ UUID txId2 = Timestamp.nextVersion().toUuid();
- LockKey key = new LockKey("test");
+ var tx1SharedLock = lockManager.acquire(txId1, key, S);
- CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, S);
- lockManager.acquire(txId1, key, S);
+ assertTrue(tx1SharedLock.isDone());
- try {
- lockManager.downgrade(fut0.join(), IX);
+ var tx1ExclusiveLock = lockManager.acquire(txId1, key, X);
- fail();
- } catch (LockException e) {
- // Expected.
- }
+ assertTrue(tx1ExclusiveLock.isDone());
+
+ var tx2SharedLock = lockManager.acquire(txId2, key, S);
+
+ assertFalse(tx2SharedLock.isDone());
+
+ lockManager.release(txId1, key, S);
+
+ assertTrue(lockManager.locks(txId1).hasNext());
+
+ var lock = lockManager.locks(txId1).next();
+
+ assertSame(X, lock.lockMode());
- assertEquals(2, lockManager.queue(key).size());
+ assertFalse(tx2SharedLock.isDone());
}
/**