Ignite-642: adds more unit tests.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0ae6a947
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0ae6a947
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0ae6a947

Branch: refs/heads/ignite-642
Commit: 0ae6a947b5c4235d6ca2062184e9d5e5898d9aed
Parents: 071e562
Author: Vladisav Jelisavcic <[email protected]>
Authored: Fri Mar 18 15:15:33 2016 +0100
Committer: vladisav <[email protected]>
Committed: Fri Mar 18 15:15:33 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCondition.java |  93 ++-
 .../main/java/org/apache/ignite/IgniteLock.java |  80 ++-
 .../datastructures/DataStructuresProcessor.java |   2 +-
 .../datastructures/GridCacheLockEx.java         |   2 +-
 .../datastructures/GridCacheLockImpl.java       |  45 +-
 ...eAbstractDataStructuresFailoverSelfTest.java |  73 +-
 .../IgniteClientDataStructuresAbstractTest.java |   2 +-
 .../IgniteLockAbstractSelfTest.java             | 670 ++++++++++++++++++-
 8 files changed, 890 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
index 4207222..f8dc65c 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
@@ -22,7 +22,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 
 /**
- * This interface provides a rich API for working with conditions associated 
with distributed reentrant locks.
+ * This interface provides a rich API for working with condition objects
+ * associated with distributed reentrant locks.
  * <p>
  * <h1 class="header">Functionality</h1>
  * IgniteCondition provides functionality similar to {@code 
java.util.concurrent.locks.Condition}.
@@ -51,16 +52,16 @@ public interface IgniteCondition extends Condition {
      * {@code Condition}; or
      * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
      * current thread, and interruption of thread suspension is supported; or
-     * <li> Some other node in grid fails, and lock is created in 
non-failoverSafe mode; or
-     * <li> Local node is stopped; or
+     * <li>Some other node in grid fails, and lock is created in 
non-failoverSafe mode; or
+     * <li>Local node is stopped; or
      * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
      * </ul>
      *
      * <p>If lock is not broken (because of failure of lock owner node)
      * in non-failoverSafe mode and local node is alive,
      * before this method can return the current thread must
-     * re-acquire the lock associated with this condition. When the
-     * thread returns it is <em>guaranteed</em> to hold this lock.
+     * re-acquire the lock associated with this condition. In all other cases
+     * when the thread returns it is <em>guaranteed</em> to hold this lock.
      *
      * <p>If the current thread:
      * <ul>
@@ -84,8 +85,45 @@ public interface IgniteCondition extends Condition {
      */
     @Override void await() throws IgniteInterruptedException;
 
-    /** {@inheritDoc} */
-    @Override void awaitUninterruptibly();
+    /**
+     * Causes the current thread to wait until it is signalled.
+     *
+     * <p>The lock associated with this condition is atomically
+     * released and the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until <em>one</em> of five things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #signal} method for this
+     * {@code Condition} and the current thread happens to be chosen as the
+     * thread to be awakened; or
+     * <li>Some other thread invokes the {@link #signalAll} method for this
+     * {@code Condition}; or
+     * <li>Some other node in grid fails, and lock is created in 
non-failoverSafe mode; or
+     * <li>Local node is stopped; or
+     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
+     * </ul>
+     *
+     * <p>If lock is not broken (because of failure of lock owner node)
+     * in non-failoverSafe mode and local node is alive,
+     * before this method can return the current thread must
+     * re-acquire the lock associated with this condition. In all other cases,
+     * when the thread returns it is <em>guaranteed</em> to hold this lock.
+     *
+     * <p>If the current thread's interrupted status is set when it enters
+     * this method, or it is {@linkplain Thread#interrupt interrupted}
+     * while waiting, it will continue to wait until signalled. When it finally
+     * returns from this method its interrupted status will still
+     * be set.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called. If not, an {@link 
IllegalMonitorStateException}
+     * will be thrown.
+     *
+     * @throws IgniteInterruptedException if the node stopped, or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override void awaitUninterruptibly() throws IgniteInterruptedException;
 
     /**
      * Causes the current thread to wait until it is signalled or interrupted,
@@ -254,13 +292,44 @@ public interface IgniteCondition extends Condition {
      * @return {@code false} if the deadline has elapsed upon return, else
      *         {@code true}
      * @throws IgniteInterruptedException if the current thread is interrupted
-     *         (and interruption of thread suspension is supported)
+     *         (and interruption of thread suspension is supported) or node 
stopped or
+     *         node owning the lock failed in non-failoversafe mode
      */
     @Override boolean awaitUntil(Date deadline) throws 
IgniteInterruptedException;
 
-    /** {@inheritDoc} */
-    @Override void signal();
+    /**
+     * Wakes up one waiting thread.
+     *
+     * <p>If any threads are waiting on this condition then one
+     * is selected for waking up. That thread must then re-acquire the
+     * lock before returning from {@code await}.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called. If not, an {@link 
IllegalMonitorStateException}
+     * will be thrown.
+     *
+     * @throws IgniteInterruptedException if node is stopped or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override void signal() throws IgniteInterruptedException;
 
-    /** {@inheritDoc} */
-    @Override void signalAll();
+    /**
+     * Wakes up all waiting threads.
+     *
+     * <p>If any threads are waiting on this condition then they are
+     * all woken up. Each thread must re-acquire the lock before it can
+     * return from {@code await}.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called. If not, an {@link 
IllegalMonitorStateException}
+     * will be thrown.
+     *
+     * @throws IgniteInterruptedException if node is stopped or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override void signalAll() throws IgniteInterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/IgniteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java
index 8e0da5d..e5aeb27 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java
@@ -40,8 +40,31 @@ public interface IgniteLock extends Lock, Closeable {
      */
     public String name();
 
-    /** {@inheritDoc} */
-    @Override public void lock();
+    /**
+     * Acquires the distributed reentrant lock.
+     *
+     * <p>Acquires the lock if it is not held by another thread and returns
+     * immediately, setting the lock hold count to one.
+     *
+     * <p>If the current thread already holds this lock then the hold count
+     * is incremented by one and the method returns immediately.
+     *
+     * <p>If the lock is held by another thread then the
+     * current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of four things happens:
+     *
+     * <ul>
+     *
+     * <li>The lock is acquired by the current thread; or
+     *
+     * <li>Lock is broken (any node failed while owning this lock), and lock 
is created in
+     * non-failoverSafe mode.
+     *
+     * <li>Local node is stopped.
+     *
+     * @throws IgniteInterruptedException if the node is stopped or broken in 
non-failoverSafe mode
+     */
+    void lock() throws IgniteInterruptedException;
 
     /**
      * Acquires the lock unless the current thread is
@@ -100,8 +123,40 @@ public interface IgniteLock extends Lock, Closeable {
      */
     @Override public void lockInterruptibly() throws 
IgniteInterruptedException;
 
-    /** {@inheritDoc} */
-    @Override public boolean tryLock();
+    /**
+     * Acquires the lock only if it is free at the time of invocation.
+     *
+     * <p>Acquires the lock if it is available and returns immediately
+     * with the value {@code true}.
+     * If the lock is not available then this method will return
+     * immediately with the value {@code false}.
+     *
+     * <p>A typical usage idiom for this method would be:
+     *  <pre> {@code
+     * Lock lock = ...;
+     * if (lock.tryLock()) {
+     *   try {
+     *     // manipulate protected state
+     *   } finally {
+     *     lock.unlock();
+     *   }
+     * } else {
+     *   // perform alternative actions
+     * }}</pre>
+     *
+     * This usage ensures that the lock is unlocked if it was acquired, and
+     * doesn't try to unlock if the lock was not acquired.
+     *
+     * If node is stopped, or any node failed while owning the lock in 
non-failoverSafe mode,
+     * then {@link IgniteInterruptedException} is thrown.
+     *
+     * @return {@code true} if the lock was acquired and
+     *         {@code false} otherwise
+     *
+     * @throws IgniteInterruptedException if node is stopped or,
+     *          lock is already broken in non-failover safe mode
+     */
+    @Override public boolean tryLock() throws IgniteInterruptedException;
 
     /**
      * Acquires the lock if it is not held by another thread within the given
@@ -176,8 +231,18 @@ public interface IgniteLock extends Lock, Closeable {
      */
     @Override public boolean tryLock(long timeout, TimeUnit unit) throws 
IgniteInterruptedException;
 
-    /** {@inheritDoc} */
-    @Override public void unlock();
+    /**
+     * Releases the lock.
+     *
+     * If lock is not owned by current thread, then an {@link
+     * IllegalMonitorStateException} is thrown.
+     * If lock is already broken prior to invocation of this method, and
+     * lock is created in non-failover safe mode, then {@link 
IgniteInterruptedException} is thrown.
+     *
+     * @throws IllegalMonitorStateException if not owned by current thread
+     * @throws IgniteInterruptedException if node is stopped or lock is 
already broken
+     */
+    void unlock() throws IgniteInterruptedException;
 
     /**
      * Returns a {@link Condition} instance for use with this
@@ -318,9 +383,6 @@ public interface IgniteLock extends Lock, Closeable {
      */
     public boolean isBroken();
 
-    /** {@inheritDoc} */
-    @Override public String toString();
-
     /**
      * Gets status of reentrant lock.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 6d02b1f..396f23d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -269,7 +269,7 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
         for (GridCacheRemovable ds : dsMap.values()) {
             if (ds instanceof GridCacheLockEx)
-                ((GridCacheLockEx)ds).stop();
+                ((GridCacheLockEx)ds).onStop();
         }
 
         if (initLatch.getCount() > 0) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java
index 8d6e085..a54b85b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java
@@ -48,6 +48,6 @@ public interface GridCacheLockEx extends IgniteLock, 
GridCacheRemovable {
     /**
      * Callback to notify local reentrant lock instance on node stop.
      */
-    public void stop();
+    public void onStop();
 
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
index 03fe568..6c80ca1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -232,9 +232,16 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
         /** Check if lock is in correct state (i.e. not broken in 
non-failoversafe mode),
          * if not throw  {@linkplain IgniteInterruptedException} */
-        private void validate(){
-            if(Thread.interrupted() || interruptAll){
-                throw new IgniteInterruptedException("Lock broken in 
non-failoversafe mode.");
+        private void validate(final boolean checkInterrupt){
+            // Interrupted flag is not always cleared
+            // (e.g. lock() doesn't throw exception and doesn't clear 
interrupted)
+            // but should be cleared if this method is called after lock 
breakage or node stop.
+            // If interruptAll is set, exception is thrown anyway.
+            boolean clearInterrupt = checkInterrupt || interruptAll;
+
+            if((clearInterrupt && Thread.interrupted()) || interruptAll){
+                throw new IgniteInterruptedException("Lock broken (possible 
reason: node stopped" +
+                    " or node owning lock failed while in non-failoversafe 
mode).");
             }
         }
 
@@ -431,7 +438,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
         /**
          * This method is used for synchronizing the reentrant lock state 
across all nodes.
          */
-        protected boolean compareAndSetGlobalState(final int expVal, final int 
newVal, long newThreadID) {
+        protected boolean compareAndSetGlobalState(final int expVal, final int 
newVal, final long newThreadID) {
             try {
                 return CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
@@ -450,6 +457,8 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
                                     val.setThreadId(newThreadID);
 
+                                    val.setSignals(null);
+
                                     lockView.put(key, val);
 
                                     tx.commit();
@@ -668,7 +677,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
                     object.await();
 
-                    sync.validate();
+                    sync.validate(true);
                 }
                 catch (InterruptedException e) {
                     throw new IgniteInterruptedException(e);
@@ -689,6 +698,8 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
                     lastCondition = this.name;
 
                     object.awaitUninterruptibly();
+
+                    sync.validate(false);
                 }
                 finally {
                     ctx.kernalContext().gateway().readUnlock();
@@ -707,7 +718,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
                     long result =  object.awaitNanos(nanosTimeout);
 
-                    sync.validate();
+                    sync.validate(true);
 
                     return result;
                 }
@@ -731,7 +742,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
                     boolean result = object.await(time, unit);
 
-                    sync.validate();
+                    sync.validate(true);
 
                     return result;
                 }
@@ -755,7 +766,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
                     boolean result = object.awaitUntil(deadline);
 
-                    sync.validate();
+                    sync.validate(true);
 
                     return result;
                 }
@@ -775,7 +786,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
                     if (!isHeldExclusively())
                         throw new IllegalMonitorStateException();
 
-                    validate();
+                    validate(false);
 
                     addOutgoingSignal(name);
                 }
@@ -792,7 +803,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
                     if (!isHeldExclusively())
                         throw new IllegalMonitorStateException();
 
-                    sync.validate();
+                    sync.validate(false);
 
                     addOutgoingSignalAll(name);
                 }
@@ -928,7 +939,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public void stop() {
+    @Override public void onStop() {
         if (sync == null) {
             interruptAll = true;
 
@@ -957,7 +968,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
             sync.lock();
 
-            sync.validate();
+            sync.validate(false);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -976,7 +987,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
             sync.acquireInterruptibly(1);
 
-            sync.validate();
+            sync.validate(true);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -998,7 +1009,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
             boolean result = sync.nonfairTryAcquire(1);
 
-            sync.validate();
+            sync.validate(false);
 
             return result;
         }
@@ -1019,7 +1030,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
             boolean result = sync.tryAcquireNanos(1, unit.toNanos(timeout));
 
-            sync.validate();
+            sync.validate(true);
 
             return result;
         }
@@ -1042,7 +1053,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
             initializeReentrantLock();
 
             // Validate before release.
-            sync.validate();
+            sync.validate(false);
 
             sync.release(1);
         }
@@ -1067,7 +1078,7 @@ public final class GridCacheLockImpl implements 
GridCacheLockEx, Externalizable
 
             IgniteCondition result = sync.newCondition(name);
 
-            sync.validate();
+            sync.validate(false);
 
             return result;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index f626411..a875597 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicReference;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
@@ -543,6 +544,74 @@ public abstract class 
GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     /**
      * @throws Exception If failed.
      */
+    public void testReentrantLockFailsWhenServersLeft() throws Exception {
+        client = true;
+
+        Ignite client = startGrid(gridCount());
+
+        Ignite server = grid(0);
+
+        // Initialize lock.
+        IgniteLock srvLock = server.reentrantLock("lock", true, true);
+
+        IgniteSemaphore semaphore = server.semaphore("sync", 0, true, true);
+
+        IgniteCompute compute = client.compute().withAsync();
+
+        compute.apply(new IgniteClosure<Ignite, Object>() {
+            @Override public Object apply(Ignite ignite) {
+                final IgniteLock l = ignite.reentrantLock("lock", true, true);
+
+                l.lock();
+
+                assertTrue(l.isHeldByCurrentThread());
+
+                l.unlock();
+
+                assertFalse(l.isHeldByCurrentThread());
+
+                // Signal the server to go down.
+                ignite.semaphore("sync", 0, true, true).release();
+
+                boolean isExceptionThrown = false;
+
+                try {
+                    // Wait for the server to go down.
+                    Thread.sleep(1000);
+
+                    l.lock();
+
+                    fail("Exception must be thrown.");
+                }
+                catch (InterruptedException e) {
+                    fail("Interrupted exception not expected here.");
+                }
+                catch (IgniteInterruptedException e) {
+                    isExceptionThrown = true;
+                }
+                finally {
+                    assertTrue(isExceptionThrown);
+
+                    assertFalse(l.isHeldByCurrentThread());
+                }
+                return null;
+            }
+        }, client);
+
+        // Wait for the lock on client to be acquired then released.
+        semaphore.acquire();
+
+        for (int i = 0; i < gridCount(); i++)
+            stopGrid(i);
+
+        compute.future().get();
+
+        client.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testReentrantLockConstantTopologyChangeFailoverSafe() throws 
Exception {
         doTestReentrantLock(new 
ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true);
     }
@@ -571,11 +640,11 @@ public abstract class 
GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     /**
      * @throws Exception If failed.
      */
-    private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, 
boolean failoverSafe) throws Exception {
+    private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, 
final boolean failoverSafe) throws Exception {
         try (IgniteLock lock = grid(0).reentrantLock(STRUCTURE_NAME, 
failoverSafe, true)) {
             IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new 
IgniteClosure<Ignite, Object>() {
                 @Override public Object apply(Ignite ignite) {
-                    IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, 
failoverSafe, false);
+                    final IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, 
failoverSafe, false);
 
                     IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
                         @Override public Void call() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index 06b11a1..2b23389 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -351,7 +351,7 @@ public abstract class 
IgniteClientDataStructuresAbstractTest extends GridCommonA
 
             assertFalse(lock.isLocked());
 
-            Semaphore semaphore = new Semaphore(0);
+            final Semaphore semaphore = new Semaphore(0);
 
             IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
                 @Override public Object call() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ae6a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java
index da8ab34..f276ff1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java
@@ -24,17 +24,19 @@ import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCondition;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.lang.IgniteCallable;
@@ -96,46 +98,52 @@ public abstract class IgniteLockAbstractSelfTest extends 
IgniteAtomicsAbstractTe
      * @param failoverSafe Failover safe flag.
      * @throws Exception
      */
-    private void checkFailover(boolean failoverSafe) throws Exception {
+    private void checkFailover(final boolean failoverSafe) throws Exception {
         IgniteEx g = startGrid(NODES_CNT + 1);
 
         // For vars locality.
         {
             // Ensure not exists.
-            assert g.semaphore("sem", 2, failoverSafe, false) == null;
+            assert g.reentrantLock("lock", failoverSafe, false) == null;
 
-            IgniteSemaphore sem = g.semaphore(
-                "sem",
-                2,
-                failoverSafe,
-                true);
+            IgniteLock lock  = g.reentrantLock("lock", failoverSafe, true);
 
-            sem.acquire(2);
+            lock.lock();
 
-            assert !sem.tryAcquire();
-            assertEquals(
-                0,
-                sem.availablePermits());
+            assert lock.tryLock();
+
+            assertEquals(2, lock.getHoldCount());
         }
 
         Ignite g0 = grid(0);
 
-        final IgniteSemaphore sem0 = g0.semaphore(
-            "sem",
-            -10,
-            false,
-            false);
+        final IgniteLock lock0 = g0.reentrantLock("lock", false, false);
+
+        assert !lock0.tryLock();
 
-        assert !sem0.tryAcquire();
-        assertEquals(0, sem0.availablePermits());
+        assertEquals(0, lock0.getHoldCount());
 
         IgniteInternalFuture<?> fut = multithreadedAsync(
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    sem0.acquire();
+                    try {
+                        lock0.lock();
 
-                    info("Acquired in separate thread.");
+                        info("Acquired in separate thread.");
 
+                        // Lock is acquired silently in failoverSafe mode.
+                        if (failoverSafe) {
+                            lock0.unlock();
+
+                            info("Released lock in separate thread.");
+                        }
+                    }
+                    catch (IgniteInterruptedException e) {
+                        if (!failoverSafe)
+                            info("Ignored expected exception: " + e);
+                        else
+                            throw e;
+                    }
                     return null;
                 }
             },
@@ -145,17 +153,9 @@ public abstract class IgniteLockAbstractSelfTest extends 
IgniteAtomicsAbstractTe
 
         g.close();
 
-        try {
-            fut.get(500);
-        }
-        catch (IgniteCheckedException e) {
-            if (!failoverSafe && e.hasCause(InterruptedException.class))
-                info("Ignored expected exception: " + e);
-            else
-                throw e;
-        }
+        fut.get(500);
 
-        sem0.close();
+        lock0.close();
     }
 
     /**
@@ -234,7 +234,7 @@ public abstract class IgniteLockAbstractSelfTest extends 
IgniteAtomicsAbstractTe
         // Ensure there are no hangs.
         fut.get();
 
-        // Test operations on removed semaphore.
+        // Test operations on removed lock.
         lock1.close();
 
         checkRemovedReentrantLock(lock1);
@@ -413,6 +413,608 @@ public abstract class IgniteLockAbstractSelfTest extends 
IgniteAtomicsAbstractTe
             fut.get(30_000);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLockInterruptibly() throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>();
+
+        lock0.lock();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    try {
+                        lock0.lockInterruptibly();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        assertFalse(Thread.currentThread().isInterrupted());
+
+                        isInterrupted = true;
+                    }
+                    finally {
+                        System.out.println(Thread.currentThread());
+
+                        // Assert that thread was interrupted.
+                        assertTrue(isInterrupted);
+
+                        // Assert that locked is still owned by main thread.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread doesn't own the lock.
+                        assertFalse(lock0.isHeldByCurrentThread());
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(1000);
+        }
+
+        for (Thread t : startedThreads)
+            t.interrupt();
+
+        fut.get();
+
+        lock0.unlock();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLock() throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>();
+
+        lock0.lock();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    try {
+                        lock0.lock();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+
+                        fail("Lock() method is uninterruptible.");
+                    }
+                    finally {
+                        // Assert that thread was not interrupted.
+                        assertFalse(isInterrupted);
+
+                        // Assert that interrupted flag is set and clear it in 
order to call unlock().
+                        assertTrue(Thread.interrupted());
+
+                        // Assert that lock is still owned by this thread.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread does own the lock.
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        // Release lock.
+                        lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(500);
+        }
+
+        for (Thread t : startedThreads)
+            t.interrupt();
+
+        lock0.unlock();
+
+        fut.get();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTryLock() throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>();
+
+        lock0.lock();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    boolean locked = false;
+
+                    try {
+                        locked = lock0.tryLock();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+
+                        fail("tryLock() method is uninterruptible.");
+                    }
+                    finally {
+                        // Assert that thread was not interrupted.
+                        assertFalse(isInterrupted);
+
+                        // Assert that lock is locked.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread does own the lock.
+                        assertEquals(locked, lock0.isHeldByCurrentThread());
+
+                        // Release lock.
+                        if (locked)
+                            lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(500);
+        }
+
+        for (Thread t : startedThreads)
+            t.interrupt();
+
+        fut.get();
+
+        lock0.unlock();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTryLockTimed() throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>();
+
+        lock0.lock();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    boolean locked = false;
+
+                    try {
+                        locked = lock0.tryLock(100, TimeUnit.MILLISECONDS);
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+                    }
+                    finally {
+                        // Assert that thread was not interrupted.
+                        assertFalse(isInterrupted);
+
+                        // Assert that tryLock returned false.
+                        assertFalse(locked);
+
+                        // Assert that lock is still owned by main thread.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread doesn't own the lock.
+                        assertFalse(lock0.isHeldByCurrentThread());
+
+                        // Release lock.
+                        if (locked)
+                            lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        fut.get();
+
+        lock0.unlock();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConditionAwaitUninterruptibly() throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    lock0.lock();
+
+                    IgniteCondition cond = lock0.getOrCreateCondition("cond");
+
+                    try {
+                        cond.awaitUninterruptibly();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+                    }
+                    finally {
+                        // Assert that thread was not interrupted.
+                        assertFalse(isInterrupted);
+
+                        // Assert that lock is still locked.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread does own the lock.
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        // Clear interrupt flag.
+                        assertTrue(Thread.interrupted());
+
+                        // Release lock.
+                        if (lock0.isHeldByCurrentThread())
+                            lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(500);
+        }
+
+        lock0.lock();
+
+        for (Thread t : startedThreads) {
+            t.interrupt();
+
+            lock0.getOrCreateCondition("cond").signal();
+        }
+
+        lock0.unlock();
+
+        fut.get();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConditionInterruptAwait() throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    lock0.lock();
+
+                    IgniteCondition cond = lock0.getOrCreateCondition("cond");
+
+                    try {
+                        cond.await();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+                    }
+                    finally {
+                        // Assert that thread was interrupted.
+                        assertTrue(isInterrupted);
+
+                        // Assert that lock is still locked.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread does own the lock.
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        // Release lock.
+                        if (lock0.isHeldByCurrentThread())
+                            lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(500);
+        }
+
+        for (Thread t : startedThreads)
+            t.interrupt();
+
+        fut.get();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testHasQueuedThreads() throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 5;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>();
+
+        final Set<Thread> finishedThreads = new 
GridConcurrentHashSet<Thread>();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    lock0.lock();
+
+                    // Wait until every thread tries to lock.
+                    do {
+                        Thread.sleep(1000);
+                    }
+                    while (startedThreads.size() != totalThreads);
+
+                    try {
+                        info("Acquired in separate thread. ");
+
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        
assertFalse(lock0.hasQueuedThread(Thread.currentThread()));
+
+                        finishedThreads.add(Thread.currentThread());
+
+                        if (startedThreads.size() != finishedThreads.size()) {
+                            assertTrue(lock0.hasQueuedThreads());
+                        }
+
+                        for (Thread t : startedThreads) {
+                            assertTrue(lock0.hasQueuedThread(t) != 
finishedThreads.contains(t));
+                        }
+                    }
+                    finally {
+                        lock0.unlock();
+
+                        assertFalse(lock0.isHeldByCurrentThread());
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        fut.get();
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testHasConditionQueuedThreads() throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 5;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<Thread>();
+
+        final Set<Thread> finishedThreads = new 
GridConcurrentHashSet<Thread>();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    IgniteCondition cond = lock0.getOrCreateCondition("cond");
+
+                    lock0.lock();
+
+                    startedThreads.add(Thread.currentThread());
+
+                    // Wait until every thread tries to lock.
+                    do {
+                        cond.await();
+
+                        Thread.sleep(1000);
+                    }
+                    while (startedThreads.size() != totalThreads);
+
+                    try {
+                        info("Acquired in separate thread. Number of threads 
waiting on condition: "
+                            + lock0.getWaitQueueLength(cond));
+
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        
assertFalse(lock0.hasQueuedThread(Thread.currentThread()));
+
+                        finishedThreads.add(Thread.currentThread());
+
+                        if (startedThreads.size() != finishedThreads.size()) {
+                            assertTrue(lock0.hasWaiters(cond));
+                        }
+
+                        for (Thread t : startedThreads) {
+                            if (!finishedThreads.contains(t))
+                                assertTrue(lock0.hasWaiters(cond));
+                        }
+
+                        assertTrue(lock0.getWaitQueueLength(cond) == 
(startedThreads.size() - finishedThreads.size()));
+                    }
+                    finally {
+                        cond.signal();
+
+                        lock0.unlock();
+
+                        assertFalse(lock0.isHeldByCurrentThread());
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        IgniteCondition cond = lock0.getOrCreateCondition("cond");
+
+        lock0.lock();
+
+        try {
+            // Wait until all threads are waiting on condition.
+            while (lock0.getWaitQueueLength(cond) != totalThreads) {
+                lock0.unlock();
+
+                Thread.sleep(1000);
+
+                lock0.lock();
+            }
+
+            // Signal once to get things started.
+            cond.signal();
+        }
+        finally {
+            lock0.unlock();
+        }
+
+        fut.get();
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         // No-op.

Reply via email to