This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new da075b483c IGNITE-22989 Add sync/async read-write lock (#4233)
da075b483c is described below

commit da075b483cdcd2c9933f7be5620217689da2a73b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Aug 16 11:58:48 2024 +0400

    IGNITE-22989 Add sync/async read-write lock (#4233)
---
 .../internal/util/IgniteSpinReadWriteLock.java     |  37 +-
 .../internal/util/VersatileReadWriteLock.java      | 486 +++++++++++++++++
 .../internal/util/IgniteSpinReadWriteLockTest.java |   6 +-
 .../internal/util/VersatileReadWriteLockTest.java  | 599 +++++++++++++++++++++
 4 files changed, 1102 insertions(+), 26 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
index 9c6ba014da..8ea64e37d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Spin read-write lock.
@@ -28,14 +29,14 @@ import org.apache.ignite.internal.tostring.S;
  * interruption signal).
  *
  * <p>The locks are reentrant (that is, the same thread can acquire the same 
lock a few times in a row and then
- * release them same number of times.
+ * release them same number of times).
  *
  * <p>Write lock acquire requests are prioritized over read lock acquire 
requests. That is, if both read and write lock
  * acquire requests are received when the write lock is held by someone else, 
then, on its release, the write lock attempt will be served
  * first.
  */
 public class IgniteSpinReadWriteLock {
-    /** Signals that nobody currently owns the read lock. */
+    /** Signals that nobody currently owns the write lock. */
     private static final long NO_OWNER = -1;
 
     /**
@@ -91,7 +92,7 @@ public class IgniteSpinReadWriteLock {
     private volatile int state;
 
     /**
-     * Number of pending write attempts to acquire the write lock. Currently 
it is only used to prioritize write lock attempts over read
+     * Number of pending write attempts to acquire the write lock. It is used 
to prioritize write lock attempts over read
      * lock attempts when the write lock has been released (so, if both an 
attempt to acquire the write lock and an attempt to acquire the
      * read lock are waiting for write lock to be released, a write lock 
attempt will be served first when the release happens).
      */
@@ -105,7 +106,7 @@ public class IgniteSpinReadWriteLock {
 
     /**
      * Acquires the read lock. If the write lock is held by another thread, 
this blocks until the write lock is released (and until all
-     * concurrent write locks are acquired and released, as this class 
pripritizes write lock attempts over read lock attempts).
+     * concurrent write locks are acquired and released, as this class 
prioritizes write lock attempts over read lock attempts).
      */
     @SuppressWarnings("BusyWait")
     public void readLock() {
@@ -168,13 +169,14 @@ public class IgniteSpinReadWriteLock {
     }
 
     private boolean tryAdvanceStateToReadLocked(int curState) {
-        return compareAndSet(STATE_VH, curState, curState + 1);
+        return STATE_VH.compareAndSet(this, curState, curState + 1);
     }
 
     /**
      * Tries to acquire the read lock. No spinwait is used if the lock cannot 
be acquired immediately.
      *
-     * @return {@code true} if acquired, {@code false} if write lock is 
already held by someone else
+     * @return {@code true} if acquired, {@code false} if write lock is 
already held by someone else (or someone is waiting to acquire
+     *     the write lock).
      */
     public boolean tryReadLock() {
         int cnt = readLockEntryCnt.get();
@@ -227,7 +229,7 @@ public class IgniteSpinReadWriteLock {
 
             assert curState > 0;
 
-            if (compareAndSet(STATE_VH, curState, curState - 1)) {
+            if (STATE_VH.compareAndSet(this, curState, curState - 1)) {
                 readLockEntryCnt.set(0);
 
                 return;
@@ -278,14 +280,14 @@ public class IgniteSpinReadWriteLock {
         while (true) {
             int curPendingWriteLocks = pendingWriteLocks;
 
-            if (compareAndSet(PENDING_WLOCKS_VH, curPendingWriteLocks, 
curPendingWriteLocks + 1)) {
+            if (PENDING_WLOCKS_VH.compareAndSet(this, curPendingWriteLocks, 
curPendingWriteLocks + 1)) {
                 break;
             }
         }
     }
 
     private boolean trySwitchStateToWriteLocked() {
-        return compareAndSet(STATE_VH, AVAILABLE, WRITE_LOCKED);
+        return STATE_VH.compareAndSet(this, AVAILABLE, WRITE_LOCKED);
     }
 
     private void decrementPendingWriteLocks() {
@@ -294,7 +296,7 @@ public class IgniteSpinReadWriteLock {
 
             assert curPendingWriteLocks > 0;
 
-            if (compareAndSet(PENDING_WLOCKS_VH, curPendingWriteLocks, 
curPendingWriteLocks - 1)) {
+            if (PENDING_WLOCKS_VH.compareAndSet(this, curPendingWriteLocks, 
curPendingWriteLocks - 1)) {
                 break;
             }
         }
@@ -426,28 +428,17 @@ public class IgniteSpinReadWriteLock {
         // write lock now.
         int update = readLockEntryCnt.get() > 0 ? 1 : AVAILABLE;
 
-        boolean b = compareAndSet(STATE_VH, WRITE_LOCKED, update);
+        boolean b = STATE_VH.compareAndSet(this, WRITE_LOCKED, update);
 
         assert b;
     }
 
-    /**
-     * Returns {@code true} on success.
-     *
-     * @param varHandle VarHandle.
-     * @param expect    Expected.
-     * @param update    Update.
-     * @return {@code True} on success.
-     */
-    private boolean compareAndSet(VarHandle varHandle, int expect, int update) 
{
-        return varHandle.compareAndSet(this, expect, update);
-    }
-
     /**
      * Returns the count of pending write lock requests count. Only used by 
tests, should not be used in production code.
      *
      * @return count of pending requests to get the write lock
      */
+    @TestOnly
     int pendingWriteLocksCount() {
         return pendingWriteLocks;
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
new file mode 100644
index 0000000000..a8829f9e54
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/VersatileReadWriteLock.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * A versatile read-write lock that can be used both in synchronous and 
asynchronous contexts.
+ * Its blocking methods use the spinwait strategy. When they do so, they are 
not interruptible (that is, they do not break their loop on
+ * interruption signal).
+ *
+ * <p>The locks are NOT reentrant (that is, the same thread can NOT acquire 
the same lock a few times without releasing it).
+ *
+ * <p>Write lock acquire requests are prioritized over read lock acquire 
requests. That is, if both read and write lock
+ * acquire requests are received when the write lock is held by someone else, 
then, on its release, the write lock attempt will be served
+ * first.
+ *
+ * <p>Lock owners are not tracked.
+ *
+ * <p>Asynchronous locking methods may complete the futures either in the 
calling thread (if they were able to immediately acquire
+ * the requested lock) or in the supplied pool (if they had to wait for a 
release to happen before being able to satisfy the request).
+ *
+ * <p>Asynchronous locking methods never use spin loops. They do use CAS 
loops, but these are mostly very short.
+ */
+public class VersatileReadWriteLock {
+    /** The highest bit of the state signals that the write lock is acquired. 
*/
+    private static final int WRITE_LOCK_BITS = 1 << 31;
+
+    /** 31 lower bits represent a counter of read locks that qre acquired. */
+    private static final int READ_LOCK_BITS = ~WRITE_LOCK_BITS;
+
+    /**
+     * State 0 means that both read and write locks are available for 
acquiring (as no locks are acquired).
+     *
+     * @see #state
+     */
+    private static final int AVAILABLE = 0;
+
+    /** How much time to sleep on each iteration of a spin loop 
(milliseconds). */
+    private static final int SLEEP_MILLIS = 10;
+
+    /** {@link VarHandle} used to access the {@code pendingWLocks} field. */
+    private static final VarHandle PENDING_WLOCKS_VH;
+
+    /** {@link VarHandle} used to access the {@code state} field. */
+    private static final VarHandle STATE_VH;
+
+    static {
+        try {
+            STATE_VH = MethodHandles.lookup()
+                .findVarHandle(VersatileReadWriteLock.class, "state", 
int.class);
+
+            PENDING_WLOCKS_VH = MethodHandles.lookup()
+                .findVarHandle(VersatileReadWriteLock.class, 
"pendingWriteLocks", int.class);
+        } catch (ReflectiveOperationException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    /**
+     * Main state of the lock.
+     * <ul>
+     *     <li>The highest bit is for whether the write lock is acquired or 
not.</li>
+     *     <li>The remaining bits store a counter representing the acquired 
read locks.</li>
+     * </ul>
+     */
+    @SuppressWarnings("unused")
+    private volatile int state;
+
+    /**
+     * Number of pending write attempts to acquire the write lock. It is used 
to prioritize write lock attempts over read
+     * lock attempts when the write lock has been released (so, if both an 
attempt to acquire the write lock and an attempt to acquire the
+     * read lock are waiting for write lock to be released, a write lock 
attempt will be served first when the release happens).
+     */
+    @SuppressWarnings("unused")
+    private volatile int pendingWriteLocks;
+
+    /** Futures to be completed when read locks (one per future) are acquired 
after a write lock is released. */
+    private final Set<CompletableFuture<Void>> readLockSolicitors = 
ConcurrentHashMap.newKeySet();
+
+    /** Futures to be completed when a write lock (one per future) is acquired 
after an impeding lock is released. */
+    private final Set<CompletableFuture<Void>> writeLockSolicitors = 
ConcurrentHashMap.newKeySet();
+
+    /** In this pool {@link #readLockSolicitors} and {@link 
#writeLockSolicitors} will be completed. */
+    private final Executor asyncContinuationExecutor;
+
+    /**
+     * Constructor.
+     */
+    public VersatileReadWriteLock(Executor asyncContinuationExecutor) {
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
+    }
+
+    /**
+     * Acquires the read lock. If the write lock is already held, this blocks 
until the write lock is released (and until all
+     * concurrent write locks are acquired and released, as this class 
prioritizes write lock attempts over read lock attempts).
+     */
+    @SuppressWarnings("BusyWait")
+    public void readLock() {
+        boolean interrupted = false;
+
+        while (true) {
+            int curState = state;
+
+            if (writeLockedOrGoingToBe(curState)) {
+                try {
+                    Thread.sleep(SLEEP_MILLIS);
+                } catch (InterruptedException ignored) {
+                    interrupted = true;
+                }
+
+                continue;
+            }
+
+            if (tryAdvanceStateToReadLocked(curState)) {
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
+                }
+
+                break;
+            }
+        }
+    }
+
+    private static int state(boolean writeLocked, int readLocks) {
+        assert readLocks >= 0 : readLocks;
+
+        return (writeLocked ? WRITE_LOCK_BITS : 0) | (readLocks & 
READ_LOCK_BITS);
+    }
+
+    private static boolean writeLocked(int curState) {
+        return (curState & WRITE_LOCK_BITS) != 0;
+    }
+
+    private static int readLocks(int state) {
+        return state & READ_LOCK_BITS;
+    }
+
+    private boolean writeLockedOrGoingToBe(int curState) {
+        return writeLocked(curState) || pendingWriteLocks > 0;
+    }
+
+    private boolean tryAdvanceStateToReadLocked(int curState) {
+        assert !writeLocked(curState);
+
+        int newState = state(false, readLocks(curState) + 1);
+
+        return STATE_VH.compareAndSet(this, curState, newState);
+    }
+
+    /**
+     * Tries to acquire the read lock. No spinwait is used if the lock cannot 
be acquired immediately.
+     *
+     * @return {@code true} if acquired, {@code false} if write lock is 
already held by someone else (or someone is waiting to acquire
+     *     the write lock).
+     */
+    public boolean tryReadLock() {
+        while (true) {
+            int curState = state;
+
+            if (writeLockedOrGoingToBe(curState)) {
+                return false;
+            }
+
+            if (tryAdvanceStateToReadLocked(curState)) {
+                return true;
+            }
+        }
+    }
+
+    /**
+     * Releases the read lock.
+     *
+     * @throws IllegalMonitorStateException thrown if the read lock is not 
acquired by anyone.
+     */
+    public void readUnlock() {
+        while (true) {
+            int curState = state;
+
+            // We allow a write lock to be held here as someone could have 
taken it forcefully.
+            boolean writeLocked = writeLocked(curState);
+            int readLocks = readLocks(curState);
+            if (readLocks < 1) {
+                throw new IllegalMonitorStateException();
+            }
+
+            if (STATE_VH.compareAndSet(this, curState, state(writeLocked, 
readLocks - 1))) {
+                if (readLocks == 1) {
+                    // We released the final read lock.
+                    notifyWriteLockSolicitors();
+                }
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * Acquires the write lock waiting, if needed. The thread will block until 
all other read and write locks are released.
+     */
+    @SuppressWarnings("BusyWait")
+    public void writeLock() {
+        boolean interrupted = false;
+
+        incrementPendingWriteLocks();
+        try {
+            while (!trySwitchStateToWriteLocked()) {
+                try {
+                    Thread.sleep(SLEEP_MILLIS);
+                } catch (InterruptedException ignored) {
+                    interrupted = true;
+                }
+            }
+        } finally {
+            decrementPendingWriteLocks();
+        }
+
+        if (interrupted) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void incrementPendingWriteLocks() {
+        while (true) {
+            int curPendingWriteLocks = pendingWriteLocks;
+
+            if (PENDING_WLOCKS_VH.compareAndSet(this, curPendingWriteLocks, 
curPendingWriteLocks + 1)) {
+                break;
+            }
+        }
+    }
+
+    private boolean trySwitchStateToWriteLocked() {
+        return STATE_VH.compareAndSet(this, AVAILABLE, state(true, 0));
+    }
+
+    private void decrementPendingWriteLocks() {
+        while (true) {
+            int curPendingWriteLocks = pendingWriteLocks;
+
+            assert curPendingWriteLocks > 0;
+
+            if (PENDING_WLOCKS_VH.compareAndSet(this, curPendingWriteLocks, 
curPendingWriteLocks - 1)) {
+                break;
+            }
+        }
+    }
+
+    /**
+     * Acquires the write lock without sleeping between unsuccessful attempts. 
Instead, the spinwait eats cycles of the core it gets at full
+     * speed. It is non-interruptible as its {@link #writeLock()} cousin.
+     */
+    @SuppressWarnings("PMD.EmptyControlStatement")
+    public void writeLockBusy() {
+        incrementPendingWriteLocks();
+        try {
+            while (!trySwitchStateToWriteLocked()) {
+                // No-op.
+            }
+        } finally {
+            decrementPendingWriteLocks();
+        }
+    }
+
+    /**
+     * Tries to acquire the write lock. Never blocks: if any lock has already 
been acquired by someone else, returns {@code false}
+     * immediately.
+     *
+     * @return {@code true} if the write lock has been acquired, {@code false} 
otherwise
+     */
+    public boolean tryWriteLock() {
+        return trySwitchStateToWriteLocked();
+    }
+
+    /**
+     * Tries to acquire the write lock with timeout. If it gets the write lock 
before the timeout expires, then returns {@code true}. If the
+     * timeout expires before the lock becomes available, returns {@code 
false}.
+     *
+     * @param timeout Timeout.
+     * @param unit    Unit.
+     * @return {@code true} if the write lock has been acquired in time; 
{@code false} otherwise
+     * @throws InterruptedException If interrupted.
+     */
+    @SuppressWarnings("BusyWait")
+    public boolean tryWriteLock(long timeout, TimeUnit unit) throws 
InterruptedException {
+        incrementPendingWriteLocks();
+        try {
+            long startNanos = System.nanoTime();
+
+            long timeoutNanos = unit.toNanos(timeout);
+
+            while (true) {
+                if (trySwitchStateToWriteLocked()) {
+                    return true;
+                }
+
+                Thread.sleep(SLEEP_MILLIS);
+
+                if (System.nanoTime() - startNanos >= timeoutNanos) {
+                    return false;
+                }
+            }
+        } finally {
+            decrementPendingWriteLocks();
+        }
+    }
+
+    /**
+     * Releases the write lock.
+     *
+     * @throws IllegalMonitorStateException thrown if the write lock is not 
acquired.
+     */
+    public void writeUnlock() {
+        int curState = state;
+        // There could still be some read locks if the write lock was taken 
forcefully.
+        int readLocks = readLocks(curState);
+
+        if (!writeLocked(curState)) {
+            throw new IllegalMonitorStateException();
+        }
+
+        boolean b = STATE_VH.compareAndSet(this, state, state(false, 
readLocks));
+
+        assert b;
+
+        notifyWriteLockSolicitors();
+        notifyReadLockSolicitors();
+    }
+
+    private void notifyWriteLockSolicitors() {
+        if (writeLockSolicitors.isEmpty()) {
+            return;
+        }
+
+        for (Iterator<CompletableFuture<Void>> iterator = 
writeLockSolicitors.iterator(); iterator.hasNext(); ) {
+            CompletableFuture<Void> future = iterator.next();
+
+            if (!tryWriteLock()) {
+                // Someone has already acquired an impeding lock, we're too 
late, let's wait for next opportunity.
+                break;
+            }
+
+            decrementPendingWriteLocks();
+
+            asyncContinuationExecutor.execute(() -> future.complete(null));
+
+            iterator.remove();
+        }
+    }
+
+    private void notifyReadLockSolicitors() {
+        if (readLockSolicitors.isEmpty()) {
+            return;
+        }
+
+        for (Iterator<CompletableFuture<Void>> iterator = 
readLockSolicitors.iterator(); iterator.hasNext(); ) {
+            CompletableFuture<Void> future = iterator.next();
+
+            if (!tryReadLock()) {
+                // Someone has already acquired a write lock, we're too late, 
let's wait for next opportunity.
+                break;
+            }
+
+            asyncContinuationExecutor.execute(() -> {
+                if (!future.complete(null)) {
+                    // The one who added this future has already taken the 
read lock after adding the future; as they have completed the
+                    // future, this is us who needs to unlock the excess.
+                    readUnlock();
+                }
+            });
+
+            iterator.remove();
+        }
+    }
+
+    /**
+     * Executes the provided asynchronous action under protection of a read 
lock: that is, it first obtains a read lock
+     * asynchronously, then executes the action, and then releases the lock.
+     *
+     * @param action Action to execute.
+     * @return Action result.
+     */
+    public <T> CompletableFuture<T> inReadLockAsync(Supplier<? extends 
CompletableFuture<T>> action) {
+        return readLockAsync()
+                .thenCompose(unused -> action.get())
+                .whenComplete((res, ex) -> readUnlock());
+    }
+
+    private CompletableFuture<Void> readLockAsync() {
+        if (tryReadLock()) {
+            return nullCompletedFuture();
+        }
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        readLockSolicitors.add(future);
+
+        // Let's check again as the lock might have been released before we 
added the future.
+        if (tryReadLock()) {
+            readLockSolicitors.remove(future);
+
+            if (!future.complete(null)) {
+                // The one who processes the solicitors set has already taken 
the read lock for us; as they have completed the
+                // future, this is us who needs to unlock the excess.
+                readUnlock();
+            }
+        }
+
+        return future;
+    }
+
+    /**
+     * Executes the provided asynchronous action under protection of a write 
lock: that is, it first obtains the write lock
+     * asynchronously, then executes the action, and then releases the lock.
+     *
+     * @param action Action to execute.
+     * @return Action result.
+     */
+    public <T> CompletableFuture<T> inWriteLockAsync(Supplier<? extends 
CompletableFuture<T>> action) {
+        return writeLockAsync()
+                .thenCompose(unused -> action.get())
+                .whenComplete((res, ex) -> writeUnlock());
+    }
+
+    private CompletableFuture<Void> writeLockAsync() {
+        if (tryWriteLock()) {
+            return nullCompletedFuture();
+        }
+
+        incrementPendingWriteLocks();
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        writeLockSolicitors.add(future);
+
+        // Let's check again as the lock might have been released before we 
added the future.
+        if (tryWriteLock()) {
+            decrementPendingWriteLocks();
+            writeLockSolicitors.remove(future);
+
+            future.complete(null);
+        }
+
+        return future;
+    }
+
+    /**
+     * Returns the count of pending write lock requests count. Only used by 
tests, should not be used in production code.
+     *
+     * @return count of pending requests to get the write lock
+     */
+    @TestOnly
+    int pendingWriteLocksCount() {
+        return pendingWriteLocks;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(VersatileReadWriteLock.class, this);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLockTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLockTest.java
index 23fd0686c0..9d670e2892 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLockTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLockTest.java
@@ -86,7 +86,7 @@ class IgniteSpinReadWriteLockTest {
     private void assertThatWriteLockAcquireAttemptBlocksForever() {
         Future<?> future = executor.submit(lock::writeLock);
 
-        assertThrows(TimeoutException.class, () -> future.get(500, 
TimeUnit.MILLISECONDS));
+        assertThrows(TimeoutException.class, () -> future.get(100, 
TimeUnit.MILLISECONDS));
     }
 
     @Test
@@ -146,7 +146,7 @@ class IgniteSpinReadWriteLockTest {
     private void assertThatReadLockAcquireAttemptBlocksForever() {
         Future<?> readLockAttemptFuture = executor.submit(lock::readLock);
 
-        assertThrows(TimeoutException.class, () -> 
readLockAttemptFuture.get(500, TimeUnit.MILLISECONDS));
+        assertThrows(TimeoutException.class, () -> 
readLockAttemptFuture.get(100, TimeUnit.MILLISECONDS));
     }
 
     @Test
@@ -188,7 +188,7 @@ class IgniteSpinReadWriteLockTest {
     private void assertThatWriteLockAcquireAttemptWithoutSleepsBlocksForever() 
{
         Future<?> future = executor.submit(lock::writeLockBusy);
 
-        assertThrows(TimeoutException.class, () -> future.get(500, 
TimeUnit.MILLISECONDS));
+        assertThrows(TimeoutException.class, () -> future.get(100, 
TimeUnit.MILLISECONDS));
     }
 
     @Test
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
new file mode 100644
index 0000000000..267fbdbbb5
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/VersatileReadWriteLockTest.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.anyOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+
+/**
+ * Tests for {@link VersatileReadWriteLock}.
+ */
+@Timeout(20)
+class VersatileReadWriteLockTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(VersatileReadWriteLockTest.class);
+
+    private static final String ASYNC_CONTINUATION_THREAD_PREFIX = "ace";
+
+    private final ExecutorService asyncContinuationExecutor = 
Executors.newCachedThreadPool(
+            new NamedThreadFactory(ASYNC_CONTINUATION_THREAD_PREFIX, LOG)
+    );
+
+    /** The lock under test. */
+    private final VersatileReadWriteLock lock = new 
VersatileReadWriteLock(asyncContinuationExecutor);
+
+    /** Executor service used to run tasks in threads different from the main 
test thread. */
+    private final ExecutorService executor = Executors.newCachedThreadPool();
+
+    /**
+     * Cleans up after a test.
+     */
+    @AfterEach
+    void cleanup() {
+        releaseReadLocks();
+        releaseWriteLocks();
+
+        IgniteUtils.shutdownAndAwaitTermination(executor, 3, SECONDS);
+        IgniteUtils.shutdownAndAwaitTermination(asyncContinuationExecutor, 3, 
SECONDS);
+    }
+
+    private void releaseReadLocks() {
+        while (true) {
+            try {
+                lock.readUnlock();
+            } catch (IllegalMonitorStateException e) {
+                // Released our read lock completely.
+                break;
+            }
+        }
+    }
+
+    private void releaseWriteLocks() {
+        while (true) {
+            try {
+                lock.writeUnlock();
+            } catch (IllegalMonitorStateException e) {
+                // Released our write lock completely.
+                break;
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(BlockingWriteLockAcquisition.class)
+    void 
readLockDoesNotAllowWriteLockToBeAcquired(BlockingWriteLockAcquisition 
acquisition) {
+        lock.readLock();
+
+        assertThatWriteLockAcquireAttemptBlocksForever(acquisition);
+
+        lock.readUnlock();
+    }
+
+    @ParameterizedTest
+    @EnumSource(BlockingWriteLockAcquisition.class)
+    void 
readLockDoesNotAllowWriteLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
 acquisition) {
+        assertThatActionBlocksForever(() -> {
+            lock.readLock();
+            acquisition.acquire(lock);
+        });
+
+        lock.readUnlock();
+    }
+
+    private void 
assertThatWriteLockAcquireAttemptBlocksForever(BlockingWriteLockAcquisition 
acquisition) {
+        assertThatActionBlocksForever(() -> acquisition.acquire(lock));
+    }
+
+    private void assertThatActionBlocksForever(Runnable action) {
+        CompletableFuture<?> future = runAsync(action, executor);
+
+        assertThat(future, willTimeoutIn(100, MILLISECONDS));
+    }
+
+    @Test
+    void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeout() throws 
Exception {
+        lock.readLock();
+
+        Boolean acquired = callWithTimeout(() -> lock.tryWriteLock(1, 
MILLISECONDS));
+        assertThat(acquired, is(false));
+
+        lock.readUnlock();
+    }
+
+    @Test
+    void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeoutBySameThread() 
throws Exception {
+        Boolean acquired = callWithTimeout(() -> {
+            lock.readLock();
+            return lock.tryWriteLock(1, MILLISECONDS);
+        });
+        assertThat(acquired, is(false));
+
+        lock.readUnlock();
+    }
+
+    @Test
+    void readLockAllowsReadLockToBeAcquired() {
+        lock.readLock();
+
+        assertThatReadLockCanBeAcquired();
+    }
+
+    private void assertThatReadLockCanBeAcquired() {
+        runWithTimeout(lock::readLock);
+    }
+
+    private <T> T callWithTimeout(Callable<T> call) throws ExecutionException, 
InterruptedException, TimeoutException {
+        return executor.submit(call).get(10, SECONDS);
+    }
+
+    private void runWithTimeout(Runnable runnable) {
+        assertThat(runAsync(runnable, executor), willCompleteSuccessfully());
+    }
+
+    @ParameterizedTest
+    @EnumSource(BlockingWriteLockAcquisition.class)
+    void 
writeLockDoesNotAllowReadLockToBeAcquired(BlockingWriteLockAcquisition 
acquisition) {
+        acquisition.acquire(lock);
+
+        assertThatReadLockAcquireAttemptBlocksForever();
+
+        lock.writeUnlock();
+    }
+
+    private void assertThatReadLockAcquireAttemptBlocksForever() {
+        assertThatActionBlocksForever(lock::readLock);
+    }
+
+    @ParameterizedTest
+    @EnumSource(BlockingWriteLockAcquisition.class)
+    void 
writeLockDoesNotAllowReadLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
 acquisition) {
+        assertThatActionBlocksForever(() -> {
+            acquisition.acquire(lock);
+            lock.readLock();
+        });
+
+        lock.writeUnlock();
+    }
+
+    @CartesianTest
+    @EnumSource(BlockingWriteLockAcquisition.class)
+    void writeLockDoesNotAllowWriteLockToBeAcquired(
+            @Enum(BlockingWriteLockAcquisition.class) 
BlockingWriteLockAcquisition firstAttempt,
+            @Enum(BlockingWriteLockAcquisition.class) 
BlockingWriteLockAcquisition secondAttempt
+    ) {
+        firstAttempt.acquire(lock);
+
+        assertThatWriteLockAcquireAttemptBlocksForever(secondAttempt);
+
+        lock.writeUnlock();
+    }
+
+    @CartesianTest
+    @EnumSource(BlockingWriteLockAcquisition.class)
+    void writeLockDoesNotAllowWriteLockToBeAcquiredBySameThread(
+            @Enum(BlockingWriteLockAcquisition.class) 
BlockingWriteLockAcquisition firstAttempt,
+            @Enum(BlockingWriteLockAcquisition.class) 
BlockingWriteLockAcquisition secondAttempt
+    ) {
+        assertThatActionBlocksForever(() -> {
+            firstAttempt.acquire(lock);
+            secondAttempt.acquire(lock);
+        });
+
+        lock.writeUnlock();
+    }
+
+    @Test
+    void readUnlockReleasesTheLock() {
+        lock.readLock();
+        lock.readUnlock();
+
+        runWithTimeout(lock::writeLock);
+    }
+
+    @ParameterizedTest
+    @EnumSource(BlockingWriteLockAcquisition.class)
+    void writeUnlockReleasesTheLock(BlockingWriteLockAcquisition acquisition) {
+        acquisition.acquire(lock);
+        lock.writeUnlock();
+
+        assertThatReadLockCanBeAcquired();
+    }
+
+    @Test
+    void shouldNotAllowInterleavingHoldingReadAndWriteLocks() {
+        lock.writeLock();
+
+        assertFalse(lock.tryReadLock());
+
+        lock.writeUnlock();
+
+        lock.readLock();
+
+        assertFalse(lock.tryWriteLock());
+
+        lock.readUnlock();
+
+        // Test that we can operate with write locks now.
+        lock.writeLock();
+        lock.writeUnlock();
+    }
+
+    @Test
+    void readLockReleasedLessTimesThanAcquiredShouldStillBeTaken() {
+        lock.readLock();
+
+        CompletableFuture<?> future = runAsync(() -> {
+            lock.readLock();
+            lock.readUnlock();
+        }, executor);
+        assertThat(future, willCompleteSuccessfully());
+
+        
assertThatWriteLockAcquireAttemptBlocksForever(BlockingWriteLockAcquisition.WRITE_LOCK);
+
+        lock.readUnlock();
+    }
+
+    @Test
+    void shouldThrowOnReadUnlockingWhenNotReadLocked() {
+        assertThrows(IllegalMonitorStateException.class, lock::readUnlock);
+    }
+
+    @Test
+    void shouldThrowOnWriteUnlockingWhenNotWriteLocked() {
+        assertThrows(IllegalMonitorStateException.class, lock::writeUnlock);
+    }
+
+    @ParameterizedTest
+    @EnumSource(BlockingWriteLockAcquisition.class)
+    void 
readLockAcquiredWithTryReadLockDoesNotAllowWriteLockToBeAcquired(BlockingWriteLockAcquisition
 acquisition) {
+        lock.tryReadLock();
+
+        assertThatWriteLockAcquireAttemptBlocksForever(acquisition);
+
+        lock.readUnlock();
+    }
+
+    @ParameterizedTest
+    @EnumSource(BlockingWriteLockAcquisition.class)
+    void 
readLockAcquiredWithTryReadLockDoesNotAllowWriteLockToBeAcquiredBySameThread(BlockingWriteLockAcquisition
 acquisition) {
+        assertThatActionBlocksForever(() -> {
+            lock.tryReadLock();
+            acquisition.acquire(lock);
+        });
+
+        lock.readUnlock();
+    }
+
+    @Test
+    void tryReadLockShouldReturnTrueWhenReadLockWasAcquiredSuccessfully() {
+        assertTrue(lock.tryReadLock());
+    }
+
+    @Test
+    void tryReadLockShouldReturnFalseWhenReadLockCouldNotBeAcquired() throws 
Exception {
+        lock.writeLock();
+
+        Boolean acquired = callWithTimeout(lock::tryReadLock);
+
+        assertThat(acquired, is(false));
+    }
+
+    @Test
+    void writeLockAcquiredWithTryWriteLockDoesNotAllowWriteLockToBeAcquired() {
+        lock.tryWriteLock();
+
+        assertThatReadLockAcquireAttemptBlocksForever();
+
+        lock.writeUnlock();
+    }
+
+    @Test
+    void 
writeLockAcquiredWithTryWriteLockDoesNotAllowWriteLockToBeAcquiredBySameThread()
 {
+        assertThatActionBlocksForever(() -> {
+            lock.tryWriteLock();
+            lock.readLock();
+        });
+
+        lock.writeUnlock();
+    }
+
+    @Test
+    void tryWriteLockShouldReturnTrueWhenWriteLockWasAcquiredSuccessfully() {
+        assertTrue(lock.tryWriteLock());
+    }
+
+    @Test
+    void tryWriteLockShouldReturnFalseWhenWriteLockCouldNotBeAcquired() throws 
Exception {
+        lock.writeLock();
+
+        Boolean acquired = callWithTimeout(lock::tryWriteLock);
+
+        assertThat(acquired, is(false));
+    }
+
+    @Test
+    void inReadLockAsyncExecutesClosureAfterTakingReadLock() {
+        assertThat(lock.inReadLockAsync(() -> 
completedFuture(lock.tryWriteLock())), willBe(false));
+    }
+
+    @Test
+    void inReadLockAsyncReleasesReadLockInTheEnd() {
+        
assertThat(lock.inReadLockAsync(CompletableFutures::nullCompletedFuture), 
willCompleteSuccessfully());
+
+        assertThatNoReadLockIsHeld();
+    }
+
+    @Test
+    void inReadLockAsyncReleasesReadLockInTheEndInCaseOfException() {
+        assertThat(lock.inReadLockAsync(() -> failedFuture(new 
Exception("Oops"))), willThrow(Exception.class));
+
+        assertThatNoReadLockIsHeld();
+    }
+
+    private void assertThatNoReadLockIsHeld() {
+        assertTrue(lock.tryWriteLock(), "Read lock is still held");
+
+        lock.writeUnlock();
+    }
+
+    private void assertThatNoWriteLockIsHeld() {
+        assertTrue(lock.tryReadLock(), "Write lock is still held");
+
+        lock.readUnlock();
+    }
+
+    @Test
+    void inReadLockAsyncTakesReadLockAfterWriteLockGetsReleased() {
+        lock.writeLock();
+
+        CompletableFuture<Boolean> future1 = 
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
+        CompletableFuture<Boolean> future2 = 
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
+        CompletableFuture<Boolean> future3 = 
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
+
+        assertThat(anyOf(future1, future2, future3), willTimeoutIn(100, 
MILLISECONDS));
+
+        lock.writeUnlock();
+
+        assertThat(allOf(future1, future2, future3), 
willCompleteSuccessfully());
+
+        assertThatNoReadLockIsHeld();
+    }
+
+    @Test
+    void inReadLockAsyncRespectsPendingWriteLocks() throws Exception {
+        lock.readLock();
+
+        CompletableFuture<?> writeLockFuture = runAsync(lock::writeLock, 
executor);
+
+        waitTillWriteLockAcquireAttemptIsInitiated();
+
+        CompletableFuture<Void> readLockAsyncFuture = 
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture);
+        assertFalse(readLockAsyncFuture.isDone());
+
+        // Letting the write lock to be acquired.
+        lock.readUnlock();
+
+        assertThat(writeLockFuture, willCompleteSuccessfully());
+
+        assertFalse(waitForCondition(readLockAsyncFuture::isDone, 100));
+
+        lock.writeUnlock();
+    }
+
+    private void waitTillWriteLockAcquireAttemptIsInitiated() throws 
InterruptedException {
+        boolean sawAnAttempt = waitForCondition(
+                () -> lock.pendingWriteLocksCount() > 0, SECONDS.toMillis(10));
+        assertTrue(sawAnAttempt, "Did not see any attempt to acquire write 
lock");
+    }
+
+    @Test
+    void inReadLockAsyncTakesReadLockInExecutorAfterWriteLockGetsReleased() {
+        lock.writeLock();
+
+        AtomicReference<Thread> threadRef = new AtomicReference<>();
+        CompletableFuture<?> future = 
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture)
+                .whenComplete((res, ex) -> threadRef.set(currentThread()));
+
+        lock.writeUnlock();
+        assertThat(future, willCompleteSuccessfully());
+
+        assertThat(threadRef.get().getName(), 
startsWith(ASYNC_CONTINUATION_THREAD_PREFIX));
+    }
+
+    @Test
+    void concurrentInReadLockAsyncAndWriteLockWorkCorrectly() {
+        RunnableX readLocker = () -> {
+            for (int i = 0; i < 300; i++) {
+                
lock.inReadLockAsync(CompletableFutures::nullCompletedFuture).get(10, SECONDS);
+            }
+        };
+        RunnableX writeLocker = () -> {
+            for (int i = 0; i < 300; i++) {
+                lock.writeLock();
+                lock.writeUnlock();
+            }
+        };
+
+        runRace(10_000, readLocker, writeLocker);
+
+        assertThatNoReadLockIsHeld();
+        assertThatNoWriteLockIsHeld();
+    }
+
+    @Test
+    void inWriteLockAsyncExecutesClosureAfterTakingWriteLock() {
+        assertThat(lock.inWriteLockAsync(() -> 
completedFuture(lock.tryWriteLock())), willBe(false));
+    }
+
+    @Test
+    void inWriteLockAsyncReleasesWriteLockInTheEnd() {
+        
assertThat(lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture), 
willCompleteSuccessfully());
+
+        assertThatNoWriteLockIsHeld();
+    }
+
+    @Test
+    void inWriteLockAsyncReleasesWriteLockInTheEndInCaseOfException() {
+        assertThat(lock.inWriteLockAsync(() -> failedFuture(new 
Exception("Oops"))), willThrow(Exception.class));
+
+        assertThatNoWriteLockIsHeld();
+    }
+
+    @ParameterizedTest
+    @EnumSource(WriteLockImpeder.class)
+    void 
inWriteLockAsyncTakesWriteLockAfterImpedingLockGetsReleased(WriteLockImpeder 
impeder) {
+        impeder.impede(lock);
+
+        CompletableFuture<Void> future = 
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+
+        assertThat(future, willTimeoutIn(100, MILLISECONDS));
+
+        impeder.stopImpeding(lock);
+
+        assertThat(future, willCompleteSuccessfully());
+
+        assertThatNoWriteLockIsHeld();
+    }
+
+    @ParameterizedTest
+    @EnumSource(WriteLockImpeder.class)
+    void 
multipleInWriteLockAsyncAttemptsTakeWriteLockAfterImpedingLocksGetReleased(WriteLockImpeder
 impeder) {
+        impeder.impede(lock);
+
+        CompletableFuture<Void> future1 = 
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+        CompletableFuture<Void> future2 = 
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+        CompletableFuture<Void> future3 = 
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+
+        assertThat(anyOf(future1, future2, future3), willTimeoutIn(100, 
MILLISECONDS));
+
+        impeder.stopImpeding(lock);
+
+        assertThat(allOf(future1, future2, future3), 
willCompleteSuccessfully());
+
+        assertThatNoWriteLockIsHeld();
+    }
+
+    @ParameterizedTest
+    @EnumSource(WriteLockImpeder.class)
+    void 
inWriteLockAsyncTakesWriteLockInExecutorAfterImpedingLockGetsReleased(WriteLockImpeder
 impeder) {
+        impeder.impede(lock);
+
+        AtomicReference<Thread> threadRef = new AtomicReference<>();
+        CompletableFuture<?> future = 
lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture)
+                .whenComplete((res, ex) -> threadRef.set(currentThread()));
+
+        impeder.stopImpeding(lock);
+        assertThat(future, willCompleteSuccessfully());
+
+        assertThat(threadRef.get().getName(), 
startsWith(ASYNC_CONTINUATION_THREAD_PREFIX));
+    }
+
+    @Test
+    void inWriteLockAsyncSetsPendingWriteLocks() {
+        lock.readLock();
+
+        // This will wait till read lock is released.
+        lock.inWriteLockAsync(CompletableFutures::nullCompletedFuture);
+
+        assertFalse(lock.tryReadLock());
+
+        lock.readUnlock();
+    }
+
+    private enum BlockingWriteLockAcquisition {
+        WRITE_LOCK {
+            @Override
+            void acquire(VersatileReadWriteLock lock) {
+                lock.writeLock();
+            }
+        },
+        WRITE_LOCK_BUSY {
+            @Override
+            void acquire(VersatileReadWriteLock lock) {
+                lock.writeLockBusy();
+            }
+        };
+
+        abstract void acquire(VersatileReadWriteLock lock);
+    }
+
+    private enum WriteLockImpeder {
+        READ_LOCK {
+            @Override
+            void impede(VersatileReadWriteLock lock) {
+                lock.readLock();
+            }
+
+            @Override
+            void stopImpeding(VersatileReadWriteLock lock) {
+                lock.readUnlock();
+            }
+        },
+        WRITE_LOCK {
+            @Override
+            void impede(VersatileReadWriteLock lock) {
+                lock.writeLock();
+            }
+
+            @Override
+            void stopImpeding(VersatileReadWriteLock lock) {
+                lock.writeUnlock();
+            }
+        };
+
+        abstract void impede(VersatileReadWriteLock lock);
+
+        abstract void stopImpeding(VersatileReadWriteLock lock);
+    }
+}


Reply via email to