This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new da075b483c IGNITE-22989 Add sync/async read-write lock (#4233)
da075b483c is described below
commit da075b483cdcd2c9933f7be5620217689da2a73b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Aug 16 11:58:48 2024 +0400
IGNITE-22989 Add sync/async read-write lock (#4233)
---
.../internal/util/IgniteSpinReadWriteLock.java | 37 +-
.../internal/util/VersatileReadWriteLock.java | 486 +++++++++++++++++
.../internal/util/IgniteSpinReadWriteLockTest.java | 6 +-
.../internal/util/VersatileReadWriteLockTest.java | 599 +++++++++++++++++++++
4 files changed, 1102 insertions(+), 26 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
index 9c6ba014da..8ea64e37d6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.TestOnly;
/**
* Spin read-write lock.
@@ -28,14 +29,14 @@ import org.apache.ignite.internal.tostring.S;
* interruption signal).
*
* <p>The locks are reentrant (that is, the same thread can acquire the same
lock a few times in a row and then
- * release them same number of times.
+ * release them same number of times).
*
* <p>Write lock acquire requests are prioritized over read lock acquire
requests. That is, if both read and write lock
* acquire requests are received when the write lock is held by someone else,
then, on its release, the write lock attempt will be served
* first.
*/
public class IgniteSpinReadWriteLock {
- /** Signals that nobody currently owns the read lock. */
+ /** Signals that nobody currently owns the write lock. */
private static final long NO_OWNER = -1;
/**
@@ -91,7 +92,7 @@ public class IgniteSpinReadWriteLock {
private volatile int state;
/**
- * Number of pending write attempts to acquire the write lock. Currently
it is only used to prioritize write lock attempts over read
+ * Number of pending write attempts to acquire the write lock. It is used
to prioritize write lock attempts over read
* lock attempts when the write lock has been released (so, if both an
attempt to acquire the write lock and an attempt to acquire the
* read lock are waiting for write lock to be released, a write lock
attempt will be served first when the release happens).
*/
@@ -105,7 +106,7 @@ public class IgniteSpinReadWriteLock {
/**
* Acquires the read lock. If the write lock is held by another thread,
this blocks until the write lock is released (and until all
- * concurrent write locks are acquired and released, as this class
pripritizes write lock attempts over read lock attempts).
+ * concurrent write locks are acquired and released, as this class
prioritizes write lock attempts over read lock attempts).
*/
@SuppressWarnings("BusyWait")
public void readLock() {
@@ -168,13 +169,14 @@ public class IgniteSpinReadWriteLock {
}
private boolean tryAdvanceStateToReadLocked(int curState) {
- return compareAndSet(STATE_VH, curState, curState + 1);
+ return STATE_VH.compareAndSet(this, curState, curState + 1);
}
/**
* Tries to acquire the read lock. No spinwait is used if the lock cannot
be acquired immediately.
*
- * @return {@code true} if acquired, {@code false} if write lock is
already held by someone else
+ * @return {@code true} if acquired, {@code false} if write lock is
already held by someone else (or someone is waiting to acquire
+ * the write lock).
*/
public boolean tryReadLock() {
int cnt = readLockEntryCnt.get();
@@ -227,7 +229,7 @@ public class IgniteSpinReadWriteLock {
assert curState > 0;
- if (compareAndSet(STATE_VH, curState, curState - 1)) {
+ if (STATE_VH.compareAndSet(this, curState, curState - 1)) {
readLockEntryCnt.set(0);
return;
@@ -278,14 +280,14 @@ public class IgniteSpinReadWriteLock {
while (true) {
int curPendingWriteLocks = pendingWriteLocks;
- if (compareAndSet(PENDING_WLOCKS_VH, curPendingWriteLocks,
curPendingWriteLocks + 1)) {
+ if (PENDING_WLOCKS_VH.compareAndSet(this, curPendingWriteLocks,
curPendingWriteLocks + 1)) {
break;
}
}
}
private boolean trySwitchStateToWriteLocked() {
- return compareAndSet(STATE_VH, AVAILABLE, WRITE_LOCKED);
+ return STATE_VH.compareAndSet(this, AVAILABLE, WRITE_LOCKED);
}
private void decrementPendingWriteLocks() {
@@ -294,7 +296,7 @@ public class IgniteSpinReadWriteLock {
assert curPendingWriteLocks > 0;
- if (compareAndSet(PENDING_WLOCKS_VH, curPendingWriteLocks,
curPendingWriteLocks - 1)) {
+ if (PENDING_WLOCKS_VH.compareAndSet(this, curPendingWriteLocks,
curPendingWriteLocks - 1)) {
break;
}
}
@@ -426,28 +428,17 @@ public class IgniteSpinReadWriteLock {
// write lock now.
int update = readLockEntryCnt.get() > 0 ? 1 : AVAILABLE;
- boolean b = compareAndSet(STATE_VH, WRITE_LOCKED, update);
+ boolean b = STATE_VH.compareAndSet(this, WRITE_LOCKED, update);
assert b;
}
- /**
- * Returns {@code true} on success.
- *
- * @param varHandle VarHandle.
- * @param expect Expected.
- * @param update Update.
- * @return {@code True} on success.
- */
- private boolean compareAndSet(VarHandle varHandle, int expect, int update)
{
- return varHandle.compareAndSet(this, expect, update);
- }
-
/**
* Returns the count of pending write lock requests count. Only used by
tests, should not be used in production code.
*
* @return count of pending requests to get the write lock
*/
+ @TestOnly
int pendingWriteLocksCount() {
return pendingWriteLocks;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
new file mode 100644
index 0000000000..a8829f9e54
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * A versatile read-write lock that can be used both in synchronous and
asynchronous contexts.
+ * Its blocking methods use the spinwait strategy. When they do so, they are
not interruptible (that is, they do not break their loop on
+ * interruption signal).
+ *
+ * <p>The locks are NOT reentrant (that is, the same thread can NOT acquire
the same lock a few times without releasing it).
+ *
+ * <p>Write lock acquire requests are prioritized over read lock acquire
requests. That is, if both read and write lock
+ * acquire requests are received when the write lock is held by someone else,
then, on its release, the write lock attempt will be served
+ * first.
+ *
+ * <p>Lock owners are not tracked.
+ *
+ * <p>Asynchronous locking methods may complete the futures either in the
calling thread (if they were able to immediately acquire
+ * the requested lock) or in the supplied pool (if they had to wait for a
release to happen before being able to satisfy the request).
+ *
+ * <p>Asynchronous locking methods never use spin loops. They do use CAS
loops, but these are mostly very short.
+ */
+public class VersatileReadWriteLock {
+ /** The highest bit of the state signals that the write lock is acquired.
*/
+ private static final int WRITE_LOCK_BITS = 1 << 31;
+
+ /** 31 lower bits represent a counter of read locks that qre acquired. */
+ private static final int READ_LOCK_BITS = ~WRITE_LOCK_BITS;
+
+ /**
+ * State 0 means that both read and write locks are available for
acquiring (as no locks are acquired).
+ *
+ * @see #state
+ */
+ private static final int AVAILABLE = 0;
+
+ /** How much time to sleep on each iteration of a spin loop
(milliseconds). */
+ private static final int SLEEP_MILLIS = 10;
+
+ /** {@link VarHandle} used to access the {@code pendingWLocks} field. */
+ private static final VarHandle PENDING_WLOCKS_VH;
+
+ /** {@link VarHandle} used to access the {@code state} field. */
+ private static final VarHandle STATE_VH;
+
+ static {
+ try {
+ STATE_VH = MethodHandles.lookup()
+ .findVarHandle(VersatileReadWriteLock.class, "state",
int.class);
+
+ PENDING_WLOCKS_VH = MethodHandles.lookup()
+ .findVarHandle(VersatileReadWriteLock.class,
"pendingWriteLocks", int.class);
+ } catch (ReflectiveOperationException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ /**
+ * Main state of the lock.
+ * <ul>
+ * <li>The highest bit is for whether the write lock is acquired or
not.</li>
+ * <li>The remaining bits store a counter representing the acquired
read locks.</li>
+ * </ul>
+ */
+ @SuppressWarnings("unused")
+ private volatile int state;
+
+ /**
+ * Number of pending write attempts to acquire the write lock. It is used
to prioritize write lock attempts over read
+ * lock attempts when the write lock has been released (so, if both an
attempt to acquire the write lock and an attempt to acquire the
+ * read lock are waiting for write lock to be released, a write lock
attempt will be served first when the release happens).
+ */
+ @SuppressWarnings("unused")
+ private volatile int pendingWriteLocks;
+
+ /** Futures to be completed when read locks (one per future) are acquired
after a write lock is released. */
+ private final Set<CompletableFuture<Void>> readLockSolicitors =
ConcurrentHashMap.newKeySet();
+
+ /** Futures to be completed when a write lock (one per future) is acquired
after an impeding lock is released. */
+ private final Set<CompletableFuture<Void>> writeLockSolicitors =
ConcurrentHashMap.newKeySet();
+
+ /** In this pool {@link #readLockSolicitors} and {@link
#writeLockSolicitors} will be completed. */
+ private final Executor asyncContinuationExecutor;
+
+ /**
+ * Constructor.
+ */
+ public VersatileReadWriteLock(Executor asyncContinuationExecutor) {
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
+ }
+
+ /**
+ * Acquires the read lock. If the write lock is already held, this blocks
until the write lock is released (and until all
+ * concurrent write locks are acquired and released, as this class
prioritizes write lock attempts over read lock attempts).
+ */
+ @SuppressWarnings("BusyWait")
+ public void readLock() {
+ boolean interrupted = false;
+
+ while (true) {
+ int curState = state;
+
+ if (writeLockedOrGoingToBe(curState)) {
+ try {
+ Thread.sleep(SLEEP_MILLIS);
+ } catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+
+ continue;
+ }
+
+ if (tryAdvanceStateToReadLocked(curState)) {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+
+ break;
+ }
+ }
+ }
+
+ private static int state(boolean writeLocked, int readLocks) {
+ assert readLocks >= 0 : readLocks;
+
+ return (writeLocked ? WRITE_LOCK_BITS : 0) | (readLocks &
READ_LOCK_BITS);
+ }
+
+ private static boolean writeLocked(int curState) {
+ return (curState & WRITE_LOCK_BITS) != 0;
+ }
+
+ private static int readLocks(int state) {
+ return state & READ_LOCK_BITS;
+ }
+
+ private boolean writeLockedOrGoingToBe(int curState) {
+ return writeLocked(curState) || pendingWriteLocks > 0;
+ }
+
+ private boolean tryAdvanceStateToReadLocked(int curState) {
+ assert !writeLocked(curState);
+
+ int newState = state(false, readLocks(curState) + 1);
+
+ return STATE_VH.compareAndSet(this, curState, newState);
+ }
+
+ /**
+ * Tries to acquire the read lock. No spinwait is used if the lock cannot
be acquired immediately.
+ *
+ * @return {@code true} if acquired, {@code false} if write lock is
already held by someone else (or someone is waiting to acquire
+ * the write lock).
+ */
+ public boolean tryReadLock() {
+ while (true) {
+ int curState = state;
+
+ if (writeLockedOrGoingToBe(curState)) {
+ return false;
+ }
+
+ if (tryAdvanceStateToReadLocked(curState)) {
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Releases the read lock.
+ *
+ * @throws IllegalMonitorStateException thrown if the read lock is not
acquired by anyone.
+ */
+ public void readUnlock() {
+ while (true) {
+ int curState = state;
+
+ // We allow a write lock to be held here as someone could have
taken it forcefully.
+ boolean writeLocked = writeLocked(curState);
+ int readLocks = readLocks(curState);
+ if (readLocks < 1) {
+ throw new IllegalMonitorStateException();
+ }
+
+ if (STATE_VH.compareAndSet(this, curState, state(writeLocked,
readLocks - 1))) {
+ if (readLocks == 1) {
+ // We released the final read lock.
+ notifyWriteLockSolicitors();
+ }
+
+ return;
+ }
+ }
+ }
+
+ /**
+ * Acquires the write lock waiting, if needed. The thread will block until
all other read and write locks are released.
+ */
+ @SuppressWarnings("BusyWait")
+ public void writeLock() {
+ boolean interrupted = false;
+
+ incrementPendingWriteLocks();
+ try {
+ while (!trySwitchStateToWriteLocked()) {
+ try {
+ Thread.sleep(SLEEP_MILLIS);
+ } catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ decrementPendingWriteLocks();
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void incrementPendingWriteLocks() {
+ while (true) {
+ int curPendingWriteLocks = pendingWriteLocks;
+
+ if (PENDING_WLOCKS_VH.compareAndSet(this, curPendingWriteLocks,
curPendingWriteLocks + 1)) {
+ break;
+ }
+ }
+ }
+
+ private boolean trySwitchStateToWriteLocked() {
+ return STATE_VH.compareAndSet(this, AVAILABLE, state(true, 0));
+ }
+
+ private void decrementPendingWriteLocks() {
+ while (true) {
+ int curPendingWriteLocks = pendingWriteLocks;
+
+ assert curPendingWriteLocks > 0;
+
+ if (PENDING_WLOCKS_VH.compareAndSet(this, curPendingWriteLocks,
curPendingWriteLocks - 1)) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Acquires the write lock without sleeping between unsuccessful attempts.
Instead, the spinwait eats cycles of the core it gets at full
+ * speed. It is non-interruptible as its {@link #writeLock()} cousin.
+ */
+ @SuppressWarnings("PMD.EmptyControlStatement")
+ public void writeLockBusy() {
+ incrementPendingWriteLocks();
+ try {
+ while (!trySwitchStateToWriteLocked()) {
+ // No-op.
+ }
+ } finally {
+ decrementPendingWriteLocks();
+ }
+ }
+
+ /**
+ * Tries to acquire the write lock. Never blocks: if any lock has already
been acquired by someone else, returns {@code false}
+ * immediately.
+ *
+ * @return {@code true} if the write lock has been acquired, {@code false}
otherwise
+ */
+ public boolean tryWriteLock() {
+ return trySwitchStateToWriteLocked();
+ }
+
+ /**
+ * Tries to acquire the write lock with timeout. If it gets the write lock
before the timeout expires, then returns {@code true}. If the
+ * timeout expires before the lock becomes available, returns {@code
false}.
+ *
+ * @param timeout Timeout.
+ * @param unit Unit.
+ * @return {@code true} if the write lock has been acquired in time;
{@code false} otherwise
+ * @throws InterruptedException If interrupted.
+ */
+ @SuppressWarnings("BusyWait")
+ public boolean tryWriteLock(long timeout, TimeUnit unit) throws
InterruptedException {
+ incrementPendingWriteLocks();
+ try {
+ long startNanos = System.nanoTime();
+
+ long timeoutNanos = unit.toNanos(timeout);
+
+ while (true) {
+ if (trySwitchStateToWriteLocked()) {
+ return true;
+ }
+
+ Thread.sleep(SLEEP_MILLIS);
+
+ if (System.nanoTime() - startNanos >= timeoutNanos) {
+ return false;
+ }
+ }
+ } finally {
+ decrementPendingWriteLocks();
+ }
+ }
+
+ /**
+ * Releases the write lock.
+ *
+ * @throws IllegalMonitorStateException thrown if the write lock is not
acquired.
+ */
+ public void writeUnlock() {
+ int curState = state;
+ // There could still be some read locks if the write lock was taken
forcefully.
+ int readLocks = readLocks(curState);
+
+ if (!writeLocked(curState)) {
+ throw new IllegalMonitorStateException();
+ }
+
+ boolean b = STATE_VH.compareAndSet(this, state, state(false,
readLocks));
+
+ assert b;
+
+ notifyWriteLockSolicitors();
+ notifyReadLockSolicitors();
+ }
+
+ private void notifyWriteLockSolicitors() {
+ if (writeLockSolicitors.isEmpty()) {
+ return;
+ }
+
+ for (Iterator<CompletableFuture<Void>> iterator =
writeLockSolicitors.iterator(); iterator.hasNext(); ) {
+ CompletableFuture<Void> future = iterator.next();
+
+ if (!tryWriteLock()) {
+ // Someone has already acquired an impeding lock, we're too
late, let's wait for next opportunity.
+ break;
+ }
+
+ decrementPendingWriteLocks();
+
+ asyncContinuationExecutor.execute(() -> future.complete(null));
+
+ iterator.remove();
+ }
+ }
+
+ private void notifyReadLockSolicitors() {
+ if (readLockSolicitors.isEmpty()) {
+ return;
+ }
+
+ for (Iterator<CompletableFuture<Void>> iterator =
readLockSolicitors.iterator(); iterator.hasNext(); ) {
+ CompletableFuture<Void> future = iterator.next();
+
+ if (!tryReadLock()) {
+ // Someone has already acquired a write lock, we're too late,
let's wait for next opportunity.
+ break;
+ }
+
+ asyncContinuationExecutor.execute(() -> {
+ if (!future.complete(null)) {
+ // The one who added this future has already taken the
read lock after adding the future; as they have completed the
+ // future, this is us who needs to unlock the excess.
+ readUnlock();
+ }
+ });
+
+ iterator.remove();
+ }
+ }
+
+ /**
+ * Executes the provided asynchronous action under protection of a read
lock: that is, it first obtains a read lock
+ * asynchronously, then executes the action, and then releases the lock.
+ *
+ * @param action Action to execute.
+ * @return Action result.
+ */
+ public <T> CompletableFuture<T> inReadLockAsync(Supplier<? extends
CompletableFuture<T>> action) {
+ return readLockAsync()
+ .thenCompose(unused -> action.get())
+ .whenComplete((res, ex) -> readUnlock());
+ }
+
+ private CompletableFuture<Void> readLockAsync() {
+ if (tryReadLock()) {
+ return nullCompletedFuture();
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ readLockSolicitors.add(future);
+
+ // Let's check again as the lock might have been released before we
added the future.
+ if (tryReadLock()) {
+ readLockSolicitors.remove(future);
+
+ if (!future.complete(null)) {
+ // The one who processes the solicitors set has already taken
the read lock for us; as they have completed the
+ // future, this is us who needs to unlock the excess.
+ readUnlock();
+ }
+ }
+
+ return future;
+ }
+
+ /**
+ * Executes the provided asynchronous action under protection of a write
lock: that is, it first obtains the write lock
+ * asynchronously, then executes the action, and then releases the lock.
+ *
+ * @param action Action to execute.
+ * @return Action result.
+ */
+ public <T> CompletableFuture<T> inWriteLockAsync(Supplier<? extends
CompletableFuture<T>> action) {
+ return writeLockAsync()
+ .thenCompose(unused -> action.get())
+ .whenComplete((res, ex) -> writeUnlock());
+ }
+
+ private CompletableFuture<Void> writeLockAsync() {
+ if (tryWriteLock()) {
+ return nullCompletedFuture();
+ }
+
+ incrementPendingWriteLocks();
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ writeLockSolicitors.add(future);
+
+ // Let's check again as the lock might have been released before we
added the future.
+ if (tryWriteLock()) {
+ decrementPendingWriteLocks();
+ writeLockSolicitors.remove(future);
+
+ future.complete(null);
+ }
+
+ return future;
+ }
+
+ /**
+ * Returns the count of pending write lock requests count. Only used by
tests, should not be used in production code.
+ *
+ * @return count of pending requests to get the write lock
+ */
+ @TestOnly
+ int pendingWriteLocksCount() {
+ return pendingWriteLocks;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(VersatileReadWriteLock.class, this);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLockTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLockTest.java
index 23fd0686c0..9d670e2892 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLockTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLockTest.java
@@ -86,7 +86,7 @@ class IgniteSpinReadWriteLockTest {
private void assertThatWriteLockAcquireAttemptBlocksForever() {
Future<?> future = executor.submit(lock::writeLock);
- assertThrows(TimeoutException.class, () -> future.get(500,
TimeUnit.MILLISECONDS));
+ assertThrows(TimeoutException.class, () -> future.get(100,
TimeUnit.MILLISECONDS));
}
@Test
@@ -146,7 +146,7 @@ class IgniteSpinReadWriteLockTest {
private void assertThatReadLockAcquireAttemptBlocksForever() {
Future<?> readLockAttemptFuture = executor.submit(lock::readLock);
- assertThrows(TimeoutException.class, () ->
readLockAttemptFuture.get(500, TimeUnit.MILLISECONDS));
+ assertThrows(TimeoutException.class, () ->
readLockAttemptFuture.get(100, TimeUnit.MILLISECONDS));
}
@Test
@@ -188,7 +188,7 @@ class IgniteSpinReadWriteLockTest {
private void assertThatWriteLockAcquireAttemptWithoutSleepsBlocksForever()
{
Future<?> future = executor.submit(lock::writeLockBusy);
- assertThrows(TimeoutException.class, () -> future.get(500,
TimeUnit.MILLISECONDS));
+ assertThrows(TimeoutException.class, () -> future.get(100,
TimeUnit.MILLISECONDS));
}
@Test
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
new file mode 100644
index 0000000000..267fbdbbb5
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.anyOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+
+/**
+ * Tests for {@link VersatileReadWriteLock}.
+ */
+@Timeout(20)
+class VersatileReadWriteLockTest {
+ private static final IgniteLogger LOG =
Loggers.forClass(VersatileReadWriteLockTest.class);
+
+ private static final String ASYNC_CONTINUATION_THREAD_PREFIX = "ace";
+
+ private final ExecutorService asyncContinuationExecutor =
Executors.newCachedThreadPool(
+ new NamedThreadFactory(ASYNC_CONTINUATION_THREAD_PREFIX, LOG)
+ );
+
+ /** The lock under test. */
+ private final VersatileReadWriteLock lock = new
VersatileReadWriteLock(asyncContinuationExecutor);
+
+ /** Executor service used to run tasks in threads different from the main
test thread. */
+ private final ExecutorService executor = Executors.newCachedThreadPool();
+
+ /**
+ * Cleans up after a test.
+ */
+ @AfterEach
+ void cleanup() {
+ releaseReadLocks();
+ releaseWriteLocks();
+
+ IgniteUtils.shutdownAndAwaitTermination(executor, 3, SECONDS);
+ IgniteUtils.shutdownAndAwaitTermination(asyncContinuationExecutor, 3,
SECONDS);
+ }
+
+ private void releaseReadLocks() {
+ while (true) {
+ try {
+ lock.readUnlock();
+ } catch (IllegalMonitorStateException e) {
+ // Released our read lock completely.
+ break;
+ }
+ }
+ }
+
+ private void releaseWriteLocks() {
+ while (true) {
+ try {
+ lock.writeUnlock();
+ } catch (IllegalMonitorStateException e) {
+ // Released our write lock completely.
+ break;
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(BlockingWriteLockAcquisition.class)
+ void
readLockDoesNotAllowWriteLockToBeAcquired(BlockingWriteLockAcquisition
acquisition) {
+ lock.readLock();
+
+ assertThatWriteLockAcquireAttemptBlocksForever(acquisition);
+
+ lock.readUnlock();
+ }
+
+ @ParameterizedTest
+ @EnumSource(BlockingWriteLockAcquisition.class)
+ void
readLockDoesNotAllowWriteLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
acquisition) {
+ assertThatActionBlocksForever(() -> {
+ lock.readLock();
+ acquisition.acquire(lock);
+ });
+
+ lock.readUnlock();
+ }
+
+ private void
assertThatWriteLockAcquireAttemptBlocksForever(BlockingWriteLockAcquisition
acquisition) {
+ assertThatActionBlocksForever(() -> acquisition.acquire(lock));
+ }
+
+ private void assertThatActionBlocksForever(Runnable action) {
+ CompletableFuture<?> future = runAsync(action, executor);
+
+ assertThat(future, willTimeoutIn(100, MILLISECONDS));
+ }
+
+ @Test
+ void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeout() throws
Exception {
+ lock.readLock();
+
+ Boolean acquired = callWithTimeout(() -> lock.tryWriteLock(1,
MILLISECONDS));
+ assertThat(acquired, is(false));
+
+ lock.readUnlock();
+ }
+
+ @Test
+ void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeoutBySameThread()
throws Exception {
+ Boolean acquired = callWithTimeout(() -> {
+ lock.readLock();
+ return lock.tryWriteLock(1, MILLISECONDS);
+ });
+ assertThat(acquired, is(false));
+
+ lock.readUnlock();
+ }
+
+ @Test
+ void readLockAllowsReadLockToBeAcquired() {
+ lock.readLock();
+
+ assertThatReadLockCanBeAcquired();
+ }
+
+ private void assertThatReadLockCanBeAcquired() {
+ runWithTimeout(lock::readLock);
+ }
+
+ private <T> T callWithTimeout(Callable<T> call) throws ExecutionException,
InterruptedException, TimeoutException {
+ return executor.submit(call).get(10, SECONDS);
+ }
+
+ private void runWithTimeout(Runnable runnable) {
+ assertThat(runAsync(runnable, executor), willCompleteSuccessfully());
+ }
+
+ @ParameterizedTest
+ @EnumSource(BlockingWriteLockAcquisition.class)
+ void
writeLockDoesNotAllowReadLockToBeAcquired(BlockingWriteLockAcquisition
acquisition) {
+ acquisition.acquire(lock);
+
+ assertThatReadLockAcquireAttemptBlocksForever();
+
+ lock.writeUnlock();
+ }
+
+ private void assertThatReadLockAcquireAttemptBlocksForever() {
+ assertThatActionBlocksForever(lock::readLock);
+ }
+
+ @ParameterizedTest
+ @EnumSource(BlockingWriteLockAcquisition.class)
+ void
writeLockDoesNotAllowReadLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
acquisition) {
+ assertThatActionBlocksForever(() -> {
+ acquisition.acquire(lock);
+ lock.readLock();
+ });
+
+ lock.writeUnlock();
+ }
+
+ @CartesianTest
+ @EnumSource(BlockingWriteLockAcquisition.class)
+ void writeLockDoesNotAllowWriteLockToBeAcquired(
+ @Enum(BlockingWriteLockAcquisition.class)
BlockingWriteLockAcquisition firstAttempt,
+ @Enum(BlockingWriteLockAcquisition.class)
BlockingWriteLockAcquisition secondAttempt
+ ) {
+ firstAttempt.acquire(lock);
+
+ assertThatWriteLockAcquireAttemptBlocksForever(secondAttempt);
+
+ lock.writeUnlock();
+ }
+
+ @CartesianTest
+ @EnumSource(BlockingWriteLockAcquisition.class)
+ void writeLockDoesNotAllowWriteLockToBeAcquiredBySameThread(
+ @Enum(BlockingWriteLockAcquisition.class)
BlockingWriteLockAcquisition firstAttempt,
+ @Enum(BlockingWriteLockAcquisition.class)
BlockingWriteLockAcquisition secondAttempt
+ ) {
+ assertThatActionBlocksForever(() -> {
+ firstAttempt.acquire(lock);
+ secondAttempt.acquire(lock);
+ });
+
+ lock.writeUnlock();
+ }
+
+ @Test
+ void readUnlockReleasesTheLock() {
+ lock.readLock();
+ lock.readUnlock();
+
+ runWithTimeout(lock::writeLock);
+ }
+
+ @ParameterizedTest
+ @EnumSource(BlockingWriteLockAcquisition.class)
+ void writeUnlockReleasesTheLock(BlockingWriteLockAcquisition acquisition) {
+ acquisition.acquire(lock);
+ lock.writeUnlock();
+
+ assertThatReadLockCanBeAcquired();
+ }
+
+ @Test
+ void shouldNotAllowInterleavingHoldingReadAndWriteLocks() {
+ lock.writeLock();
+
+ assertFalse(lock.tryReadLock());
+
+ lock.writeUnlock();
+
+ lock.readLock();
+
+ assertFalse(lock.tryWriteLock());
+
+ lock.readUnlock();
+
+ // Test that we can operate with write locks now.
+ lock.writeLock();
+ lock.writeUnlock();
+ }
+
+ @Test
+ void readLockReleasedLessTimesThanAcquiredShouldStillBeTaken() {
+ lock.readLock();
+
+ CompletableFuture<?> future = runAsync(() -> {
+ lock.readLock();
+ lock.readUnlock();
+ }, executor);
+ assertThat(future, willCompleteSuccessfully());
+
+
assertThatWriteLockAcquireAttemptBlocksForever(BlockingWriteLockAcquisition.WRITE_LOCK);
+
+ lock.readUnlock();
+ }
+
+ @Test
+ void shouldThrowOnReadUnlockingWhenNotReadLocked() {
+ assertThrows(IllegalMonitorStateException.class, lock::readUnlock);
+ }
+
+ @Test
+ void shouldThrowOnWriteUnlockingWhenNotWriteLocked() {
+ assertThrows(IllegalMonitorStateException.class, lock::writeUnlock);
+ }
+
+ @ParameterizedTest
+ @EnumSource(BlockingWriteLockAcquisition.class)
+ void
readLockAcquiredWithTryReadLockDoesNotAllowWriteLockToBeAcquired(BlockingWriteLockAcquisition
acquisition) {
+ lock.tryReadLock();
+
+ assertThatWriteLockAcquireAttemptBlocksForever(acquisition);
+
+ lock.readUnlock();
+ }
+
+ @ParameterizedTest
+ @EnumSource(BlockingWriteLockAcquisition.class)
+ void
readLockAcquiredWithTryReadLockDoesNotAllowWriteLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
acquisition) {
+ assertThatActionBlocksForever(() -> {
+ lock.tryReadLock();
+ acquisition.acquire(lock);
+ });
+
+ lock.readUnlock();
+ }
+
+ @Test
+ void tryReadLockShouldReturnTrueWhenReadLockWasAcquiredSuccessfully() {
+ assertTrue(lock.tryReadLock());
+ }
+
+ @Test
+ void tryReadLockShouldReturnFalseWhenReadLockCouldNotBeAcquired() throws
Exception {
+ lock.writeLock();
+
+ Boolean acquired = callWithTimeout(lock::tryReadLock);
+
+ assertThat(acquired, is(false));
+ }
+
+ @Test
+ void writeLockAcquiredWithTryWriteLockDoesNotAllowWriteLockToBeAcquired() {
+ lock.tryWriteLock();
+
+ assertThatReadLockAcquireAttemptBlocksForever();
+
+ lock.writeUnlock();
+ }
+
+ @Test
+ void
writeLockAcquiredWithTryWriteLockDoesNotAllowWriteLockToBeAcquiredBySameThread()
{
+ assertThatActionBlocksForever(() -> {
+ lock.tryWriteLock();
+ lock.readLock();
+ });
+
+ lock.writeUnlock();
+ }
+
+ @Test
+ void tryWriteLockShouldReturnTrueWhenWriteLockWasAcquiredSuccessfully() {
+ assertTrue(lock.tryWriteLock());
+ }
+
+ @Test
+ void tryWriteLockShouldReturnFalseWhenWriteLockCouldNotBeAcquired() throws
Exception {
+ lock.writeLock();
+
+ Boolean acquired = callWithTimeout(lock::tryWriteLock);
+
+ assertThat(acquired, is(false));
+ }
+
+ @Test
+ void inReadLockAsyncExecutesClosureAfterTakingReadLock() {
+ assertThat(lock.inReadLockAsync(() ->
completedFuture(lock.tryWriteLock())), willBe(false));
+ }
+
+ @Test
+ void inReadLockAsyncReleasesReadLockInTheEnd() {
+
assertThat(lock.inReadLockAsync(CompletableFutures::nullCompletedFuture),
willCompleteSuccessfully());
+
+ assertThatNoReadLockIsHeld();
+ }
+
+ @Test
+ void inReadLockAsyncReleasesReadLockInTheEndInCaseOfException() {
+ assertThat(lock.inReadLockAsync(() -> failedFuture(new
Exception("Oops"))), willThrow(Exception.class));
+
+ assertThatNoReadLockIsHeld();
+ }
+
+ private void assertThatNoReadLockIsHeld() {
+ assertTrue(lock.tryWriteLock(), "Read lock is still held");
+
+ lock.writeUnlock();
+ }
+
+ private void assertThatNoWriteLockIsHeld() {
+ assertTrue(lock.tryReadLock(), "Write lock is still held");
+
+ lock.readUnlock();
+ }
+
+ @Test
+ void inReadLockAsyncTakesReadLockAfterWriteLockGetsReleased() {
+ lock.writeLock();
+
+ CompletableFuture<Boolean> future1 =
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
+ CompletableFuture<Boolean> future2 =
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
+ CompletableFuture<Boolean> future3 =
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
+
+ assertThat(anyOf(future1, future2, future3), willTimeoutIn(100,
MILLISECONDS));
+
+ lock.writeUnlock();
+
+ assertThat(allOf(future1, future2, future3),
willCompleteSuccessfully());
+
+ assertThatNoReadLockIsHeld();
+ }
+
+ @Test
+ void inReadLockAsyncRespectsPendingWriteLocks() throws Exception {
+ lock.readLock();
+
+ CompletableFuture<?> writeLockFuture = runAsync(lock::writeLock,
executor);
+
+ waitTillWriteLockAcquireAttemptIsInitiated();
+
+ CompletableFuture<Void> readLockAsyncFuture =
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
+ assertFalse(readLockAsyncFuture.isDone());
+
+ // Letting the write lock to be acquired.
+ lock.readUnlock();
+
+ assertThat(writeLockFuture, willCompleteSuccessfully());
+
+ assertFalse(waitForCondition(readLockAsyncFuture::isDone, 100));
+
+ lock.writeUnlock();
+ }
+
+ private void waitTillWriteLockAcquireAttemptIsInitiated() throws
InterruptedException {
+ boolean sawAnAttempt = waitForCondition(
+ () -> lock.pendingWriteLocksCount() > 0, SECONDS.toMillis(10));
+ assertTrue(sawAnAttempt, "Did not see any attempt to acquire write
lock");
+ }
+
+ @Test
+ void inReadLockAsyncTakesReadLockInExecutorAfterWriteLockGetsReleased() {
+ lock.writeLock();
+
+ AtomicReference<Thread> threadRef = new AtomicReference<>();
+ CompletableFuture<?> future =
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture)
+ .whenComplete((res, ex) -> threadRef.set(currentThread()));
+
+ lock.writeUnlock();
+ assertThat(future, willCompleteSuccessfully());
+
+ assertThat(threadRef.get().getName(),
startsWith(ASYNC_CONTINUATION_THREAD_PREFIX));
+ }
+
+ @Test
+ void concurrentInReadLockAsyncAndWriteLockWorkCorrectly() {
+ RunnableX readLocker = () -> {
+ for (int i = 0; i < 300; i++) {
+
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture).get(10, SECONDS);
+ }
+ };
+ RunnableX writeLocker = () -> {
+ for (int i = 0; i < 300; i++) {
+ lock.writeLock();
+ lock.writeUnlock();
+ }
+ };
+
+ runRace(10_000, readLocker, writeLocker);
+
+ assertThatNoReadLockIsHeld();
+ assertThatNoWriteLockIsHeld();
+ }
+
+ @Test
+ void inWriteLockAsyncExecutesClosureAfterTakingWriteLock() {
+ assertThat(lock.inWriteLockAsync(() ->
completedFuture(lock.tryWriteLock())), willBe(false));
+ }
+
+ @Test
+ void inWriteLockAsyncReleasesWriteLockInTheEnd() {
+
assertThat(lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture),
willCompleteSuccessfully());
+
+ assertThatNoWriteLockIsHeld();
+ }
+
+ @Test
+ void inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfException() {
+ assertThat(lock.inWriteLockAsync(() -> failedFuture(new
Exception("Oops"))), willThrow(Exception.class));
+
+ assertThatNoWriteLockIsHeld();
+ }
+
+ @ParameterizedTest
+ @EnumSource(WriteLockImpeder.class)
+ void
inWriteLockAsyncTakesWriteLockAfterImpedingLockGetsReleased(WriteLockImpeder
impeder) {
+ impeder.impede(lock);
+
+ CompletableFuture<Void> future =
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+
+ assertThat(future, willTimeoutIn(100, MILLISECONDS));
+
+ impeder.stopImpeding(lock);
+
+ assertThat(future, willCompleteSuccessfully());
+
+ assertThatNoWriteLockIsHeld();
+ }
+
+ @ParameterizedTest
+ @EnumSource(WriteLockImpeder.class)
+ void
multipleInWriteLockAsyncAttemptsTakeWriteLockAfterImpedingLocksGetReleased(WriteLockImpeder
impeder) {
+ impeder.impede(lock);
+
+ CompletableFuture<Void> future1 =
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+ CompletableFuture<Void> future2 =
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+ CompletableFuture<Void> future3 =
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+
+ assertThat(anyOf(future1, future2, future3), willTimeoutIn(100,
MILLISECONDS));
+
+ impeder.stopImpeding(lock);
+
+ assertThat(allOf(future1, future2, future3),
willCompleteSuccessfully());
+
+ assertThatNoWriteLockIsHeld();
+ }
+
+ @ParameterizedTest
+ @EnumSource(WriteLockImpeder.class)
+ void
inWriteLockAsyncTakesWriteLockInExecutorAfterImpedingLockGetsReleased(WriteLockImpeder
impeder) {
+ impeder.impede(lock);
+
+ AtomicReference<Thread> threadRef = new AtomicReference<>();
+ CompletableFuture<?> future =
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture)
+ .whenComplete((res, ex) -> threadRef.set(currentThread()));
+
+ impeder.stopImpeding(lock);
+ assertThat(future, willCompleteSuccessfully());
+
+ assertThat(threadRef.get().getName(),
startsWith(ASYNC_CONTINUATION_THREAD_PREFIX));
+ }
+
+ @Test
+ void inWriteLockAsyncSetsPendingWriteLocks() {
+ lock.readLock();
+
+ // This will wait till read lock is released.
+ lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+
+ assertFalse(lock.tryReadLock());
+
+ lock.readUnlock();
+ }
+
+ private enum BlockingWriteLockAcquisition {
+ WRITE_LOCK {
+ @Override
+ void acquire(VersatileReadWriteLock lock) {
+ lock.writeLock();
+ }
+ },
+ WRITE_LOCK_BUSY {
+ @Override
+ void acquire(VersatileReadWriteLock lock) {
+ lock.writeLockBusy();
+ }
+ };
+
+ abstract void acquire(VersatileReadWriteLock lock);
+ }
+
+ private enum WriteLockImpeder {
+ READ_LOCK {
+ @Override
+ void impede(VersatileReadWriteLock lock) {
+ lock.readLock();
+ }
+
+ @Override
+ void stopImpeding(VersatileReadWriteLock lock) {
+ lock.readUnlock();
+ }
+ },
+ WRITE_LOCK {
+ @Override
+ void impede(VersatileReadWriteLock lock) {
+ lock.writeLock();
+ }
+
+ @Override
+ void stopImpeding(VersatileReadWriteLock lock) {
+ lock.writeUnlock();
+ }
+ };
+
+ abstract void impede(VersatileReadWriteLock lock);
+
+ abstract void stopImpeding(VersatileReadWriteLock lock);
+ }
+}