Ignite-642: adds more unit tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0ae6a947 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0ae6a947 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0ae6a947 Branch: refs/heads/ignite-642 Commit: 0ae6a947b5c4235d6ca2062184e9d5e5898d9aed Parents: 071e562 Author: Vladisav Jelisavcic <[email protected]> Authored: Fri Mar 18 15:15:33 2016 +0100 Committer: vladisav <[email protected]> Committed: Fri Mar 18 15:15:33 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCondition.java | 93 ++- .../main/java/org/apache/ignite/IgniteLock.java | 80 ++- .../datastructures/DataStructuresProcessor.java | 2 +- .../datastructures/GridCacheLockEx.java | 2 +- .../datastructures/GridCacheLockImpl.java | 45 +- ...eAbstractDataStructuresFailoverSelfTest.java | 73 +- .../IgniteClientDataStructuresAbstractTest.java | 2 +- .../IgniteLockAbstractSelfTest.java | 670 ++++++++++++++++++- 8 files changed, 890 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java index 4207222..f8dc65c 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java @@ -22,7 +22,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; /** - * This interface provides a rich API for working with conditions associated with distributed reentrant locks. + * This interface provides a rich API for working with condition objects + * associated with distributed reentrant locks. * <p> * <h1 class="header">Functionality</h1> * IgniteCondition provides functionality similar to {@code java.util.concurrent.locks.Condition}. @@ -51,16 +52,16 @@ public interface IgniteCondition extends Condition { * {@code Condition}; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} the * current thread, and interruption of thread suspension is supported; or - * <li> Some other node in grid fails, and lock is created in non-failoverSafe mode; or - * <li> Local node is stopped; or + * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or + * <li>Local node is stopped; or * <li>A "<em>spurious wakeup</em>" occurs. * </ul> * * <p>If lock is not broken (because of failure of lock owner node) * in non-failoverSafe mode and local node is alive, * before this method can return the current thread must - * re-acquire the lock associated with this condition. When the - * thread returns it is <em>guaranteed</em> to hold this lock. + * re-acquire the lock associated with this condition. In all other cases + * when the thread returns it is <em>guaranteed</em> to hold this lock. * * <p>If the current thread: * <ul> @@ -84,8 +85,45 @@ public interface IgniteCondition extends Condition { */ @Override void await() throws IgniteInterruptedException; - /** {@inheritDoc} */ - @Override void awaitUninterruptibly(); + /** + * Causes the current thread to wait until it is signalled. + * + * <p>The lock associated with this condition is atomically + * released and the current thread becomes disabled for thread scheduling + * purposes and lies dormant until <em>one</em> of five things happens: + * <ul> + * <li>Some other thread invokes the {@link #signal} method for this + * {@code Condition} and the current thread happens to be chosen as the + * thread to be awakened; or + * <li>Some other thread invokes the {@link #signalAll} method for this + * {@code Condition}; or + * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or + * <li>Local node is stopped; or + * <li>A "<em>spurious wakeup</em>" occurs. + * </ul> + * + * <p>If lock is not broken (because of failure of lock owner node) + * in non-failoverSafe mode and local node is alive, + * before this method can return the current thread must + * re-acquire the lock associated with this condition. In all other cases, + * when the thread returns it is <em>guaranteed</em> to hold this lock. + * + * <p>If the current thread's interrupted status is set when it enters + * this method, or it is {@linkplain Thread#interrupt interrupted} + * while waiting, it will continue to wait until signalled. When it finally + * returns from this method its interrupted status will still + * be set. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. + * + * @throws IgniteInterruptedException if the node stopped, or + * node owning the lock failed in non-failoversafe mode + */ + @Override void awaitUninterruptibly() throws IgniteInterruptedException; /** * Causes the current thread to wait until it is signalled or interrupted, @@ -254,13 +292,44 @@ public interface IgniteCondition extends Condition { * @return {@code false} if the deadline has elapsed upon return, else * {@code true} * @throws IgniteInterruptedException if the current thread is interrupted - * (and interruption of thread suspension is supported) + * (and interruption of thread suspension is supported) or node stopped or + * node owning the lock failed in non-failoversafe mode */ @Override boolean awaitUntil(Date deadline) throws IgniteInterruptedException; - /** {@inheritDoc} */ - @Override void signal(); + /** + * Wakes up one waiting thread. + * + * <p>If any threads are waiting on this condition then one + * is selected for waking up. That thread must then re-acquire the + * lock before returning from {@code await}. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. + * + * @throws IgniteInterruptedException if node is stopped or + * node owning the lock failed in non-failoversafe mode + */ + @Override void signal() throws IgniteInterruptedException; - /** {@inheritDoc} */ - @Override void signalAll(); + /** + * Wakes up all waiting threads. + * + * <p>If any threads are waiting on this condition then they are + * all woken up. Each thread must re-acquire the lock before it can + * return from {@code await}. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. + * + * @throws IgniteInterruptedException if node is stopped or + * node owning the lock failed in non-failoversafe mode + */ + @Override void signalAll() throws IgniteInterruptedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/IgniteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java index 8e0da5d..e5aeb27 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java @@ -40,8 +40,31 @@ public interface IgniteLock extends Lock, Closeable { */ public String name(); - /** {@inheritDoc} */ - @Override public void lock(); + /** + * Acquires the distributed reentrant lock. + * + * <p>Acquires the lock if it is not held by another thread and returns + * immediately, setting the lock hold count to one. + * + * <p>If the current thread already holds this lock then the hold count + * is incremented by one and the method returns immediately. + * + * <p>If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of four things happens: + * + * <ul> + * + * <li>The lock is acquired by the current thread; or + * + * <li>Lock is broken (any node failed while owning this lock), and lock is created in + * non-failoverSafe mode. + * + * <li>Local node is stopped. + * + * @throws IgniteInterruptedException if the node is stopped or broken in non-failoverSafe mode + */ + void lock() throws IgniteInterruptedException; /** * Acquires the lock unless the current thread is @@ -100,8 +123,40 @@ public interface IgniteLock extends Lock, Closeable { */ @Override public void lockInterruptibly() throws IgniteInterruptedException; - /** {@inheritDoc} */ - @Override public boolean tryLock(); + /** + * Acquires the lock only if it is free at the time of invocation. + * + * <p>Acquires the lock if it is available and returns immediately + * with the value {@code true}. + * If the lock is not available then this method will return + * immediately with the value {@code false}. + * + * <p>A typical usage idiom for this method would be: + * <pre> {@code + * Lock lock = ...; + * if (lock.tryLock()) { + * try { + * // manipulate protected state + * } finally { + * lock.unlock(); + * } + * } else { + * // perform alternative actions + * }}</pre> + * + * This usage ensures that the lock is unlocked if it was acquired, and + * doesn't try to unlock if the lock was not acquired. + * + * If node is stopped, or any node failed while owning the lock in non-failoverSafe mode, + * then {@link IgniteInterruptedException} is thrown. + * + * @return {@code true} if the lock was acquired and + * {@code false} otherwise + * + * @throws IgniteInterruptedException if node is stopped or, + * lock is already broken in non-failover safe mode + */ + @Override public boolean tryLock() throws IgniteInterruptedException; /** * Acquires the lock if it is not held by another thread within the given @@ -176,8 +231,18 @@ public interface IgniteLock extends Lock, Closeable { */ @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException; - /** {@inheritDoc} */ - @Override public void unlock(); + /** + * Releases the lock. + * + * If lock is not owned by current thread, then an {@link + * IllegalMonitorStateException} is thrown. + * If lock is already broken prior to invocation of this method, and + * lock is created in non-failover safe mode, then {@link IgniteInterruptedException} is thrown. + * + * @throws IllegalMonitorStateException if not owned by current thread + * @throws IgniteInterruptedException if node is stopped or lock is already broken + */ + void unlock() throws IgniteInterruptedException; /** * Returns a {@link Condition} instance for use with this @@ -318,9 +383,6 @@ public interface IgniteLock extends Lock, Closeable { */ public boolean isBroken(); - /** {@inheritDoc} */ - @Override public String toString(); - /** * Gets status of reentrant lock. * http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 6d02b1f..396f23d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -269,7 +269,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { for (GridCacheRemovable ds : dsMap.values()) { if (ds instanceof GridCacheLockEx) - ((GridCacheLockEx)ds).stop(); + ((GridCacheLockEx)ds).onStop(); } if (initLatch.getCount() > 0) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java index 8d6e085..a54b85b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java @@ -48,6 +48,6 @@ public interface GridCacheLockEx extends IgniteLock, GridCacheRemovable { /** * Callback to notify local reentrant lock instance on node stop. */ - public void stop(); + public void onStop(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java index 03fe568..6c80ca1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java @@ -232,9 +232,16 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable /** Check if lock is in correct state (i.e. not broken in non-failoversafe mode), * if not throw {@linkplain IgniteInterruptedException} */ - private void validate(){ - if(Thread.interrupted() || interruptAll){ - throw new IgniteInterruptedException("Lock broken in non-failoversafe mode."); + private void validate(final boolean checkInterrupt){ + // Interrupted flag is not always cleared + // (e.g. lock() doesn't throw exception and doesn't clear interrupted) + // but should be cleared if this method is called after lock breakage or node stop. + // If interruptAll is set, exception is thrown anyway. + boolean clearInterrupt = checkInterrupt || interruptAll; + + if((clearInterrupt && Thread.interrupted()) || interruptAll){ + throw new IgniteInterruptedException("Lock broken (possible reason: node stopped" + + " or node owning lock failed while in non-failoversafe mode)."); } } @@ -431,7 +438,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable /** * This method is used for synchronizing the reentrant lock state across all nodes. */ - protected boolean compareAndSetGlobalState(final int expVal, final int newVal, long newThreadID) { + protected boolean compareAndSetGlobalState(final int expVal, final int newVal, final long newThreadID) { try { return CU.outTx( retryTopologySafe(new Callable<Boolean>() { @@ -450,6 +457,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable val.setThreadId(newThreadID); + val.setSignals(null); + lockView.put(key, val); tx.commit(); @@ -668,7 +677,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable object.await(); - sync.validate(); + sync.validate(true); } catch (InterruptedException e) { throw new IgniteInterruptedException(e); @@ -689,6 +698,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable lastCondition = this.name; object.awaitUninterruptibly(); + + sync.validate(false); } finally { ctx.kernalContext().gateway().readUnlock(); @@ -707,7 +718,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable long result = object.awaitNanos(nanosTimeout); - sync.validate(); + sync.validate(true); return result; } @@ -731,7 +742,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable boolean result = object.await(time, unit); - sync.validate(); + sync.validate(true); return result; } @@ -755,7 +766,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable boolean result = object.awaitUntil(deadline); - sync.validate(); + sync.validate(true); return result; } @@ -775,7 +786,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - validate(); + validate(false); addOutgoingSignal(name); } @@ -792,7 +803,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - sync.validate(); + sync.validate(false); addOutgoingSignalAll(name); } @@ -928,7 +939,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable } /** {@inheritDoc} */ - @Override public void stop() { + @Override public void onStop() { if (sync == null) { interruptAll = true; @@ -957,7 +968,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable sync.lock(); - sync.validate(); + sync.validate(false); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -976,7 +987,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable sync.acquireInterruptibly(1); - sync.validate(); + sync.validate(true); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -998,7 +1009,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable boolean result = sync.nonfairTryAcquire(1); - sync.validate(); + sync.validate(false); return result; } @@ -1019,7 +1030,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable boolean result = sync.tryAcquireNanos(1, unit.toNanos(timeout)); - sync.validate(); + sync.validate(true); return result; } @@ -1042,7 +1053,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable initializeReentrantLock(); // Validate before release. - sync.validate(); + sync.validate(false); sync.release(1); } @@ -1067,7 +1078,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable IgniteCondition result = sync.newCondition(name); - sync.validate(); + sync.validate(false); return result; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index f626411..a875597 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicReference; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; @@ -543,6 +544,74 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** * @throws Exception If failed. */ + public void testReentrantLockFailsWhenServersLeft() throws Exception { + client = true; + + Ignite client = startGrid(gridCount()); + + Ignite server = grid(0); + + // Initialize lock. + IgniteLock srvLock = server.reentrantLock("lock", true, true); + + IgniteSemaphore semaphore = server.semaphore("sync", 0, true, true); + + IgniteCompute compute = client.compute().withAsync(); + + compute.apply(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + final IgniteLock l = ignite.reentrantLock("lock", true, true); + + l.lock(); + + assertTrue(l.isHeldByCurrentThread()); + + l.unlock(); + + assertFalse(l.isHeldByCurrentThread()); + + // Signal the server to go down. + ignite.semaphore("sync", 0, true, true).release(); + + boolean isExceptionThrown = false; + + try { + // Wait for the server to go down. + Thread.sleep(1000); + + l.lock(); + + fail("Exception must be thrown."); + } + catch (InterruptedException e) { + fail("Interrupted exception not expected here."); + } + catch (IgniteInterruptedException e) { + isExceptionThrown = true; + } + finally { + assertTrue(isExceptionThrown); + + assertFalse(l.isHeldByCurrentThread()); + } + return null; + } + }, client); + + // Wait for the lock on client to be acquired then released. + semaphore.acquire(); + + for (int i = 0; i < gridCount(); i++) + stopGrid(i); + + compute.future().get(); + + client.close(); + } + + /** + * @throws Exception If failed. + */ public void testReentrantLockConstantTopologyChangeFailoverSafe() throws Exception { doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true); } @@ -571,11 +640,11 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** * @throws Exception If failed. */ - private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, boolean failoverSafe) throws Exception { + private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, final boolean failoverSafe) throws Exception { try (IgniteLock lock = grid(0).reentrantLock(STRUCTURE_NAME, failoverSafe, true)) { IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { @Override public Object apply(Ignite ignite) { - IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, false); + final IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, false); IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java index 06b11a1..2b23389 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java @@ -351,7 +351,7 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA assertFalse(lock.isLocked()); - Semaphore semaphore = new Semaphore(0); + final Semaphore semaphore = new Semaphore(0); IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java index da8ab34..f276ff1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java @@ -24,17 +24,19 @@ import java.io.ObjectOutput; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.lang.IgniteCallable; @@ -96,46 +98,52 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @param failoverSafe Failover safe flag. * @throws Exception */ - private void checkFailover(boolean failoverSafe) throws Exception { + private void checkFailover(final boolean failoverSafe) throws Exception { IgniteEx g = startGrid(NODES_CNT + 1); // For vars locality. { // Ensure not exists. - assert g.semaphore("sem", 2, failoverSafe, false) == null; + assert g.reentrantLock("lock", failoverSafe, false) == null; - IgniteSemaphore sem = g.semaphore( - "sem", - 2, - failoverSafe, - true); + IgniteLock lock = g.reentrantLock("lock", failoverSafe, true); - sem.acquire(2); + lock.lock(); - assert !sem.tryAcquire(); - assertEquals( - 0, - sem.availablePermits()); + assert lock.tryLock(); + + assertEquals(2, lock.getHoldCount()); } Ignite g0 = grid(0); - final IgniteSemaphore sem0 = g0.semaphore( - "sem", - -10, - false, - false); + final IgniteLock lock0 = g0.reentrantLock("lock", false, false); + + assert !lock0.tryLock(); - assert !sem0.tryAcquire(); - assertEquals(0, sem0.availablePermits()); + assertEquals(0, lock0.getHoldCount()); IgniteInternalFuture<?> fut = multithreadedAsync( new Callable<Object>() { @Override public Object call() throws Exception { - sem0.acquire(); + try { + lock0.lock(); - info("Acquired in separate thread."); + info("Acquired in separate thread."); + // Lock is acquired silently in failoverSafe mode. + if (failoverSafe) { + lock0.unlock(); + + info("Released lock in separate thread."); + } + } + catch (IgniteInterruptedException e) { + if (!failoverSafe) + info("Ignored expected exception: " + e); + else + throw e; + } return null; } }, @@ -145,17 +153,9 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe g.close(); - try { - fut.get(500); - } - catch (IgniteCheckedException e) { - if (!failoverSafe && e.hasCause(InterruptedException.class)) - info("Ignored expected exception: " + e); - else - throw e; - } + fut.get(500); - sem0.close(); + lock0.close(); } /** @@ -234,7 +234,7 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe // Ensure there are no hangs. fut.get(); - // Test operations on removed semaphore. + // Test operations on removed lock. lock1.close(); checkRemovedReentrantLock(lock1); @@ -413,6 +413,608 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe fut.get(30_000); } + /** + * @throws Exception If failed. + */ + public void testLockInterruptibly() throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>(); + + lock0.lock(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + try { + lock0.lockInterruptibly(); + } + catch (IgniteInterruptedException e) { + assertFalse(Thread.currentThread().isInterrupted()); + + isInterrupted = true; + } + finally { + System.out.println(Thread.currentThread()); + + // Assert that thread was interrupted. + assertTrue(isInterrupted); + + // Assert that locked is still owned by main thread. + assertTrue(lock0.isLocked()); + + // Assert that this thread doesn't own the lock. + assertFalse(lock0.isHeldByCurrentThread()); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(1000); + } + + for (Thread t : startedThreads) + t.interrupt(); + + fut.get(); + + lock0.unlock(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testLock() throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>(); + + lock0.lock(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + try { + lock0.lock(); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + + fail("Lock() method is uninterruptible."); + } + finally { + // Assert that thread was not interrupted. + assertFalse(isInterrupted); + + // Assert that interrupted flag is set and clear it in order to call unlock(). + assertTrue(Thread.interrupted()); + + // Assert that lock is still owned by this thread. + assertTrue(lock0.isLocked()); + + // Assert that this thread does own the lock. + assertTrue(lock0.isHeldByCurrentThread()); + + // Release lock. + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(500); + } + + for (Thread t : startedThreads) + t.interrupt(); + + lock0.unlock(); + + fut.get(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testTryLock() throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>(); + + lock0.lock(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + boolean locked = false; + + try { + locked = lock0.tryLock(); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + + fail("tryLock() method is uninterruptible."); + } + finally { + // Assert that thread was not interrupted. + assertFalse(isInterrupted); + + // Assert that lock is locked. + assertTrue(lock0.isLocked()); + + // Assert that this thread does own the lock. + assertEquals(locked, lock0.isHeldByCurrentThread()); + + // Release lock. + if (locked) + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(500); + } + + for (Thread t : startedThreads) + t.interrupt(); + + fut.get(); + + lock0.unlock(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testTryLockTimed() throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>(); + + lock0.lock(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + boolean locked = false; + + try { + locked = lock0.tryLock(100, TimeUnit.MILLISECONDS); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + } + finally { + // Assert that thread was not interrupted. + assertFalse(isInterrupted); + + // Assert that tryLock returned false. + assertFalse(locked); + + // Assert that lock is still owned by main thread. + assertTrue(lock0.isLocked()); + + // Assert that this thread doesn't own the lock. + assertFalse(lock0.isHeldByCurrentThread()); + + // Release lock. + if (locked) + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + fut.get(); + + lock0.unlock(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testConditionAwaitUninterruptibly() throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + lock0.lock(); + + IgniteCondition cond = lock0.getOrCreateCondition("cond"); + + try { + cond.awaitUninterruptibly(); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + } + finally { + // Assert that thread was not interrupted. + assertFalse(isInterrupted); + + // Assert that lock is still locked. + assertTrue(lock0.isLocked()); + + // Assert that this thread does own the lock. + assertTrue(lock0.isHeldByCurrentThread()); + + // Clear interrupt flag. + assertTrue(Thread.interrupted()); + + // Release lock. + if (lock0.isHeldByCurrentThread()) + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(500); + } + + lock0.lock(); + + for (Thread t : startedThreads) { + t.interrupt(); + + lock0.getOrCreateCondition("cond").signal(); + } + + lock0.unlock(); + + fut.get(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testConditionInterruptAwait() throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + lock0.lock(); + + IgniteCondition cond = lock0.getOrCreateCondition("cond"); + + try { + cond.await(); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + } + finally { + // Assert that thread was interrupted. + assertTrue(isInterrupted); + + // Assert that lock is still locked. + assertTrue(lock0.isLocked()); + + // Assert that this thread does own the lock. + assertTrue(lock0.isHeldByCurrentThread()); + + // Release lock. + if (lock0.isHeldByCurrentThread()) + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(500); + } + + for (Thread t : startedThreads) + t.interrupt(); + + fut.get(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testHasQueuedThreads() throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 5; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>(); + + final Set<Thread> finishedThreads = new GridConcurrentHashSet<Thread>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + lock0.lock(); + + // Wait until every thread tries to lock. + do { + Thread.sleep(1000); + } + while (startedThreads.size() != totalThreads); + + try { + info("Acquired in separate thread. "); + + assertTrue(lock0.isHeldByCurrentThread()); + + assertFalse(lock0.hasQueuedThread(Thread.currentThread())); + + finishedThreads.add(Thread.currentThread()); + + if (startedThreads.size() != finishedThreads.size()) { + assertTrue(lock0.hasQueuedThreads()); + } + + for (Thread t : startedThreads) { + assertTrue(lock0.hasQueuedThread(t) != finishedThreads.contains(t)); + } + } + finally { + lock0.unlock(); + + assertFalse(lock0.isHeldByCurrentThread()); + } + + return null; + } + }, totalThreads); + + fut.get(); + + assertFalse(lock0.hasQueuedThreads()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testHasConditionQueuedThreads() throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 5; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>(); + + final Set<Thread> finishedThreads = new GridConcurrentHashSet<Thread>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + IgniteCondition cond = lock0.getOrCreateCondition("cond"); + + lock0.lock(); + + startedThreads.add(Thread.currentThread()); + + // Wait until every thread tries to lock. + do { + cond.await(); + + Thread.sleep(1000); + } + while (startedThreads.size() != totalThreads); + + try { + info("Acquired in separate thread. Number of threads waiting on condition: " + + lock0.getWaitQueueLength(cond)); + + assertTrue(lock0.isHeldByCurrentThread()); + + assertFalse(lock0.hasQueuedThread(Thread.currentThread())); + + finishedThreads.add(Thread.currentThread()); + + if (startedThreads.size() != finishedThreads.size()) { + assertTrue(lock0.hasWaiters(cond)); + } + + for (Thread t : startedThreads) { + if (!finishedThreads.contains(t)) + assertTrue(lock0.hasWaiters(cond)); + } + + assertTrue(lock0.getWaitQueueLength(cond) == (startedThreads.size() - finishedThreads.size())); + } + finally { + cond.signal(); + + lock0.unlock(); + + assertFalse(lock0.isHeldByCurrentThread()); + } + + return null; + } + }, totalThreads); + + IgniteCondition cond = lock0.getOrCreateCondition("cond"); + + lock0.lock(); + + try { + // Wait until all threads are waiting on condition. + while (lock0.getWaitQueueLength(cond) != totalThreads) { + lock0.unlock(); + + Thread.sleep(1000); + + lock0.lock(); + } + + // Signal once to get things started. + cond.signal(); + } + finally { + lock0.unlock(); + } + + fut.get(); + + assertFalse(lock0.hasQueuedThreads()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { // No-op.
