ignite-642: Implemented.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b4e49bde
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b4e49bde
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b4e49bde

Branch: refs/heads/ignite-642
Commit: b4e49bde29d6b5feb8d3b4f6b0d157c786c9ba98
Parents: 46b6a76
Author: Vladisav Jelisavcic <[email protected]>
Authored: Fri Feb 26 16:11:16 2016 +0100
Committer: vladisav <[email protected]>
Committed: Fri Feb 26 16:11:16 2016 +0100

----------------------------------------------------------------------
 .../IgniteReentrantLockExample.java             |  294 +++++
 .../ignite/examples/CacheExamplesSelfTest.java  |    8 +
 .../src/main/java/org/apache/ignite/Ignite.java |   16 +
 .../java/org/apache/ignite/IgniteCondition.java |  339 ++++++
 .../org/apache/ignite/IgniteReentrantLock.java  |  434 +++++++
 .../apache/ignite/internal/IgniteKernal.java    |   20 +
 .../datastructures/DataStructuresProcessor.java |  159 ++-
 .../GridCacheReentrantLockEx.java               |   47 +
 .../GridCacheReentrantLockImpl.java             | 1150 ++++++++++++++++++
 .../GridCacheReentrantLockState.java            |  298 +++++
 .../resources/META-INF/classnames.properties    |    2 +
 .../IgniteClientReconnectAtomicsTest.java       |   54 +
 ...eAbstractDataStructuresFailoverSelfTest.java |   85 +-
 .../IgniteClientDataStructuresAbstractTest.java |   70 ++
 .../IgniteDataStructureUniqueNameTest.java      |   13 +-
 .../IgniteReentrantLockAbstractSelfTest.java    |  428 +++++++
 .../local/IgniteLocalReentrantLockSelfTest.java |  110 ++
 .../IgnitePartitionedReentrantLockSelfTest.java |   33 +
 .../IgniteReplicatedReentrantLockSelfTest.java  |   33 +
 .../cache/GridCacheDataStructuresLoadTest.java  |   53 +
 .../ignite/testframework/junits/IgniteMock.java |    9 +
 .../junits/multijvm/IgniteProcessProxy.java     |    7 +
 .../org/apache/ignite/IgniteSpringBean.java     |   10 +
 23 files changed, 3663 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java
new file mode 100644
index 0000000..c87ce49
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datastructures;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCondition;
+import org.apache.ignite.IgniteReentrantLock;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteRunnable;
+
+/**
+ * This example demonstrates cache based reentrant lock.
+ * <p>
+ * Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will start node with {@code
+ * examples/config/example-ignite.xml} configuration.
+ */
+public class IgniteReentrantLockExample {
+    /** Number of items for each producer/consumer to produce/consume. */
+    private static final int OPS_COUNT = 100;
+
+    /** Number of producers. */
+    private static final int NUM_PRODUCERS = 5;
+
+    /** Number of consumers. */
+    private static final int NUM_CONSUMERS = 5;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Name of the global resource. */
+    private static final String QUEUE_ID = "queue";
+
+    /** Name of the synchronization variable. */
+    private static final String SYNC_NAME = "done";
+
+    /** Name of the condition object. */
+    private static final String NOT_FULL = "notFull";
+
+    /** Name of the condition object. */
+    private static final String NOT_EMPTY = "notEmpty";
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     */
+    public static void main(String[] args) {
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache atomic reentrant lock example 
started.");
+
+            // Make name of reentrant lock.
+            final String reentrantLockName = UUID.randomUUID().toString();
+
+            // Initialize lock.
+            IgniteReentrantLock lock = ignite.reentrantLock(reentrantLockName, 
true, true);
+
+            // Init distributed cache.
+            IgniteCache<String, Integer> cache = 
ignite.getOrCreateCache(CACHE_NAME);
+
+            // Init shared variable.
+            cache.put(QUEUE_ID, 0);
+
+            // Shared variable indicating number of jobs left to be completed.
+            cache.put(SYNC_NAME, NUM_PRODUCERS + NUM_CONSUMERS);
+
+            // Start consumers on all cluster nodes.
+            for (int i = 0; i < NUM_CONSUMERS; i++)
+                ignite.compute().withAsync().run(new 
Consumer(reentrantLockName));
+
+            // Start producers on all cluster nodes.
+            for (int i = 0; i < NUM_PRODUCERS; i++)
+                ignite.compute().withAsync().run(new 
Producer(reentrantLockName));
+
+            System.out.println("Master node is waiting for all other nodes to 
finish...");
+
+            // Wait for everyone to finish.
+            try {
+                lock.lock();
+
+                IgniteCondition notDone = lock.newCondition(SYNC_NAME);
+
+                int count = cache.get(SYNC_NAME);
+
+                while(count > 0) {
+                    notDone.await();
+
+                    count = cache.get(SYNC_NAME);
+                }
+            }
+            finally {
+                lock.unlock();
+
+            }
+        }
+
+        System.out.flush();
+        System.out.println();
+        System.out.println("Finished reentrant lock example...");
+        System.out.println("Check all nodes for output (this node is also part 
of the cluster).");
+    }
+
+    /**
+     * Closure which simply acquires reentrant lock.
+     */
+    private abstract static class ReentrantLockExampleClosure implements 
IgniteRunnable {
+        /** Semaphore name. */
+        protected final String reentrantLockName;
+
+        /**
+         * @param reentrantLockName Reentrant lock name.
+         */
+        ReentrantLockExampleClosure(String reentrantLockName) {
+            this.reentrantLockName = reentrantLockName;
+        }
+
+    }
+
+    /**
+     * Closure which simulates producer.
+     */
+    private static class Producer extends ReentrantLockExampleClosure {
+        /**
+         * @param reentrantLockName Reentrant lock name.
+         */
+        public Producer(String reentrantLockName) {
+            super(reentrantLockName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            System.out.println("Producer started. ");
+
+            IgniteReentrantLock lock = 
Ignition.ignite().reentrantLock(reentrantLockName, true, true);
+
+            // Condition to wait on when queue is full.
+            IgniteCondition notFull = lock.newCondition(NOT_FULL);
+
+            // Signaled to wait on when queue is empty.
+            IgniteCondition notEmpty = lock.newCondition(NOT_EMPTY);
+
+            // Signaled when job is done.
+            IgniteCondition done = lock.newCondition(SYNC_NAME);
+
+            IgniteCache<String, Integer> cache = 
Ignition.ignite().cache(CACHE_NAME);
+
+            for (int i = 0; i < OPS_COUNT; i++) {
+                try {
+                    lock.lock();
+
+                    int val = cache.get(QUEUE_ID);
+
+                    while(val >= 100){
+                        System.out.println("Queue is full. Producer [nodeId=" 
+ Ignition.ignite().cluster().localNode().id() +
+                            " paused.");
+
+                        notFull.await();
+
+                        val = cache.get(QUEUE_ID);
+                    }
+
+                    val++;
+
+                    System.out.println("Producer [nodeId=" + 
Ignition.ignite().cluster().localNode().id() +
+                        ", available=" + val + ']');
+
+                    cache.put(QUEUE_ID, val);
+
+                    notEmpty.signalAll();
+                }
+                finally {
+                    lock.unlock();
+
+                }
+            }
+
+            System.out.println("Producer finished [nodeId=" + 
Ignition.ignite().cluster().localNode().id() + ']');
+
+            try {
+                lock.lock();
+
+                int count = cache.get(SYNC_NAME);
+
+                count--;
+
+                cache.put(SYNC_NAME, count);
+
+                // Signals the master thread.
+                done.signal();
+            }finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Closure which simulates consumer.
+     */
+    private static class Consumer extends ReentrantLockExampleClosure {
+        /**
+         * @param reentrantLockName ReentrantLock name.
+         */
+        public Consumer(String reentrantLockName) {
+            super(reentrantLockName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            System.out.println("Consumer started. ");
+
+            Ignite g = Ignition.ignite();
+
+            IgniteReentrantLock lock = g.reentrantLock(reentrantLockName, 
true, true);
+
+            // Condition to wait on when queue is full.
+            IgniteCondition notFull = lock.newCondition(NOT_FULL);
+
+            // Signaled to wait on when queue is empty.
+            IgniteCondition notEmpty = lock.newCondition(NOT_EMPTY);
+
+            // Signaled when job is done.
+            IgniteCondition done = lock.newCondition(SYNC_NAME);
+
+            IgniteCache<String, Integer> cache = g.cache(CACHE_NAME);
+
+            for (int i = 0; i < OPS_COUNT; i++) {
+                try {
+                    lock.lock();
+
+                    int val = cache.get(QUEUE_ID);
+
+                    while (val <= 0) {
+                        System.out.println("Queue is empty. Consumer [nodeId=" 
+
+                            Ignition.ignite().cluster().localNode().id() + " 
paused.");
+
+                        notEmpty.await();
+
+                        val = cache.get(QUEUE_ID);
+                    }
+
+                    val--;
+
+                    System.out.println("Consumer [nodeId=" + 
Ignition.ignite().cluster().localNode().id() +
+                        ", available=" + val + ']');
+
+                    cache.put(QUEUE_ID, val);
+
+                    notFull.signalAll();
+                }
+                finally {
+                    lock.unlock();
+                }
+            }
+
+            System.out.println("Consumer finished [nodeId=" + 
Ignition.ignite().cluster().localNode().id() + ']');
+
+            try {
+                lock.lock();
+
+                int count = cache.get(SYNC_NAME);
+
+                count--;
+
+                cache.put(SYNC_NAME, count);
+
+                // Signals the master thread.
+                done.signal();
+            }finally {
+                lock.unlock();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git 
a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java 
b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index 39c2ea6..fea3627 100644
--- 
a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ 
b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.examples.datastructures.IgniteAtomicSequenceExample;
 import org.apache.ignite.examples.datastructures.IgniteAtomicStampedExample;
 import org.apache.ignite.examples.datastructures.IgniteCountDownLatchExample;
 import org.apache.ignite.examples.datastructures.IgniteQueueExample;
+import org.apache.ignite.examples.datastructures.IgniteReentrantLockExample;
 import org.apache.ignite.examples.datastructures.IgniteSemaphoreExample;
 import org.apache.ignite.examples.datastructures.IgniteSetExample;
 import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest;
@@ -99,6 +100,13 @@ public class CacheExamplesSelfTest extends 
GridAbstractExamplesTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCacheReentrantLockExample() throws Exception {
+        IgniteReentrantLockExample.main(EMPTY_ARGS);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testCacheQueueExample() throws Exception {
         IgniteQueueExample.main(EMPTY_ARGS);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java 
b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 5703744..1838fd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -466,6 +466,22 @@ public interface Ignite extends AutoCloseable {
         throws IgniteException;
 
     /**
+     * Gets or creates reentrant lock. If reentrant lock is not found in cache 
and {@code create} flag
+     * is {@code true}, it is created using provided name.
+     *
+     * @param name Name of the lock.
+     * @param failoverSafe {@code True} to create failover safe lock which 
means that
+     *      if any node leaves topology lock already acquired by that node is 
silently released
+     *      and become available for alive nodes to acquire. If flag is {@code 
false} then
+     *      all threads waiting to acquire lock get interrupted.
+     * @param create Boolean flag indicating whether data structure should be 
created if does not exist.
+     * @return ReentrantLock for the given name.
+     * @throws IgniteException If reentrant lock could not be fetched or 
created.
+     */
+    public IgniteReentrantLock reentrantLock(String name, boolean 
failoverSafe, boolean create)
+        throws IgniteException;
+
+    /**
      * Will get a named queue from cache and create one if it has not been 
created yet and {@code cfg} is not
      * {@code null}.
      * If queue is present already, queue properties will not be changed. Use

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
new file mode 100644
index 0000000..e79270d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+
+public interface IgniteCondition {
+
+    /**
+     * Name of ignite condition.
+     *
+     * @return Name of ignite condition.
+     */
+    public String name();
+
+    /**
+     * Causes the current thread to wait until it is signalled or
+     * {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>The lock associated with this {@code IgniteCondition} is atomically
+     * released and the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until <em>one</em> of four things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #signal} method for this
+     * {@code Condition} and the current thread happens to be chosen as the
+     * thread to be awakened; or
+     * <li>Some other thread invokes the {@link #signalAll} method for this
+     * {@code Condition}; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread, and interruption of thread suspension is supported; or
+     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
+     * </ul>
+     *
+     * <p>In all cases, before this method can return the current thread must
+     * re-acquire the lock associated with this condition. When the
+     * thread returns it is <em>guaranteed</em> to hold this lock.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * and interruption of thread suspension is supported,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current 
thread's
+     * interrupted status is cleared. It is not specified, in the first
+     * case, whether or not the test for interruption occurs before the lock
+     * is released.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called.
+     * It is up to the implementation to determine if this is
+     * the case and if not, how to respond. Typically, an exception will be
+     * thrown (such as {@link IllegalMonitorStateException}) and the
+     * implementation must document that fact.
+     *
+     * <p>An implementation can favor responding to an interrupt over normal
+     * method return in response to a signal. In that case the implementation
+     * must ensure that the signal is redirected to another waiting thread, if
+     * there is one.
+     *
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     *         (and interruption of thread suspension is supported)
+     */
+    void await() throws IgniteInterruptedException;
+
+    /**
+     * Causes the current thread to wait until it is signalled.
+     *
+     * <p>The lock associated with this condition is atomically
+     * released and the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until <em>one</em> of three things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #signal} method for this
+     * {@code Condition} and the current thread happens to be chosen as the
+     * thread to be awakened; or
+     * <li>Some other thread invokes the {@link #signalAll} method for this
+     * {@code Condition}; or
+     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
+     * </ul>
+     *
+     * <p>In all cases, before this method can return the current thread must
+     * re-acquire the lock associated with this condition. When the
+     * thread returns it is <em>guaranteed</em> to hold this lock.
+     *
+     * <p>If the current thread's interrupted status is set when it enters
+     * this method, or it is {@linkplain Thread#interrupt interrupted}
+     * while waiting, it will continue to wait until signalled. When it finally
+     * returns from this method its interrupted status will still
+     * be set.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called.
+     * It is up to the implementation to determine if this is
+     * the case and if not, how to respond. Typically, an exception will be
+     * thrown (such as {@link IllegalMonitorStateException}) and the
+     * implementation must document that fact.
+     */
+    void awaitUninterruptibly();
+
+    /**
+     * Causes the current thread to wait until it is signalled or interrupted,
+     * or the specified waiting time elapses.
+     *
+     * <p>The lock associated with this condition is atomically
+     * released and the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until <em>one</em> of five things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #signal} method for this
+     * {@code Condition} and the current thread happens to be chosen as the
+     * thread to be awakened; or
+     * <li>Some other thread invokes the {@link #signalAll} method for this
+     * {@code Condition}; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread, and interruption of thread suspension is supported; or
+     * <li>The specified waiting time elapses; or
+     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
+     * </ul>
+     *
+     * <p>In all cases, before this method can return the current thread must
+     * re-acquire the lock associated with this condition. When the
+     * thread returns it is <em>guaranteed</em> to hold this lock.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * and interruption of thread suspension is supported,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current 
thread's
+     * interrupted status is cleared. It is not specified, in the first
+     * case, whether or not the test for interruption occurs before the lock
+     * is released.
+     *
+     * <p>The method returns an estimate of the number of nanoseconds
+     * remaining to wait given the supplied {@code nanosTimeout}
+     * value upon return, or a value less than or equal to zero if it
+     * timed out. This value can be used to determine whether and how
+     * long to re-wait in cases where the wait returns but an awaited
+     * condition still does not hold. Typical uses of this method take
+     * the following form:
+     *
+     *  <pre> {@code
+     * boolean aMethod(long timeout, TimeUnit unit) {
+     *   long nanos = unit.toNanos(timeout);
+     *   lock.lock();
+     *   try {
+     *     while (!conditionBeingWaitedFor()) {
+     *       if (nanos <= 0L)
+     *         return false;
+     *       nanos = theCondition.awaitNanos(nanos);
+     *     }
+     *     // ...
+     *   } finally {
+     *     lock.unlock();
+     *   }
+     * }}</pre>
+     *
+     * <p>Design note: This method requires a nanosecond argument so
+     * as to avoid truncation errors in reporting remaining times.
+     * Such precision loss would make it difficult for programmers to
+     * ensure that total waiting times are not systematically shorter
+     * than specified when re-waits occur.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called.
+     * It is up to the implementation to determine if this is
+     * the case and if not, how to respond. Typically, an exception will be
+     * thrown (such as {@link IllegalMonitorStateException}) and the
+     * implementation must document that fact.
+     *
+     * <p>An implementation can favor responding to an interrupt over normal
+     * method return in response to a signal, or over indicating the elapse
+     * of the specified waiting time. In either case the implementation
+     * must ensure that the signal is redirected to another waiting thread, if
+     * there is one.
+     *
+     * @param nanosTimeout the maximum time to wait, in nanoseconds
+     * @return an estimate of the {@code nanosTimeout} value minus
+     *         the time spent waiting upon return from this method.
+     *         A positive value may be used as the argument to a
+     *         subsequent call to this method to finish waiting out
+     *         the desired time.  A value less than or equal to zero
+     *         indicates that no time remains.
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     *         (and interruption of thread suspension is supported)
+     */
+    long awaitNanos(long nanosTimeout) throws IgniteInterruptedException;
+
+    /**
+     * Causes the current thread to wait until it is signalled or interrupted,
+     * or the specified waiting time elapses. This method is behaviorally
+     * equivalent to:
+     *  <pre> {@code awaitNanos(unit.toNanos(time)) > 0}</pre>
+     *
+     * @param time the maximum time to wait
+     * @param unit the time unit of the {@code time} argument
+     * @return {@code false} if the waiting time detectably elapsed
+     *         before return from the method, else {@code true}
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     *         (and interruption of thread suspension is supported)
+     */
+    boolean await(long time, TimeUnit unit) throws IgniteInterruptedException;
+
+    /**
+     * Causes the current thread to wait until it is signalled or interrupted,
+     * or the specified deadline elapses.
+     *
+     * <p>The lock associated with this condition is atomically
+     * released and the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until <em>one</em> of five things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #signal} method for this
+     * {@code Condition} and the current thread happens to be chosen as the
+     * thread to be awakened; or
+     * <li>Some other thread invokes the {@link #signalAll} method for this
+     * {@code Condition}; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread, and interruption of thread suspension is supported; or
+     * <li>The specified deadline elapses; or
+     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
+     * </ul>
+     *
+     * <p>In all cases, before this method can return the current thread must
+     * re-acquire the lock associated with this condition. When the
+     * thread returns it is <em>guaranteed</em> to hold this lock.
+     *
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * and interruption of thread suspension is supported,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current 
thread's
+     * interrupted status is cleared. It is not specified, in the first
+     * case, whether or not the test for interruption occurs before the lock
+     * is released.
+     *
+     *
+     * <p>The return value indicates whether the deadline has elapsed,
+     * which can be used as follows:
+     *  <pre> {@code
+     * boolean aMethod(Date deadline) {
+     *   boolean stillWaiting = true;
+     *   lock.lock();
+     *   try {
+     *     while (!conditionBeingWaitedFor()) {
+     *       if (!stillWaiting)
+     *         return false;
+     *       stillWaiting = theCondition.awaitUntil(deadline);
+     *     }
+     *     // ...
+     *   } finally {
+     *     lock.unlock();
+     *   }
+     * }}</pre>
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called.
+     * It is up to the implementation to determine if this is
+     * the case and if not, how to respond. Typically, an exception will be
+     * thrown (such as {@link IllegalMonitorStateException}) and the
+     * implementation must document that fact.
+     *
+     * <p>An implementation can favor responding to an interrupt over normal
+     * method return in response to a signal, or over indicating the passing
+     * of the specified deadline. In either case the implementation
+     * must ensure that the signal is redirected to another waiting thread, if
+     * there is one.
+     *
+     * @param deadline the absolute time to wait until
+     * @return {@code false} if the deadline has elapsed upon return, else
+     *         {@code true}
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     *         (and interruption of thread suspension is supported)
+     */
+    boolean awaitUntil(Date deadline) throws IgniteInterruptedException;
+
+    /**
+     * Wakes up one waiting thread.
+     *
+     * <p>If any threads are waiting on this condition then one
+     * is selected for waking up. That thread must then re-acquire the
+     * lock before returning from {@code await}.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>An implementation may (and typically does) require that the
+     * current thread hold the lock associated with this {@code
+     * Condition} when this method is called. Implementations must
+     * document this precondition and any actions taken if the lock is
+     * not held. Typically, an exception such as {@link
+     * IllegalMonitorStateException} will be thrown.
+     */
+    void signal();
+
+    /**
+     * Wakes up all waiting threads.
+     *
+     * <p>If any threads are waiting on this condition then they are
+     * all woken up. Each thread must re-acquire the lock before it can
+     * return from {@code await}.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>An implementation may (and typically does) require that the
+     * current thread hold the lock associated with this {@code
+     * Condition} when this method is called. Implementations must
+     * document this precondition and any actions taken if the lock is
+     * not held. Typically, an exception such as {@link
+     * IllegalMonitorStateException} will be thrown.
+     */
+    void signalAll();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java
new file mode 100644
index 0000000..d209b51
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.io.Closeable;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * This interface provides a rich API for working with distributed reentrant 
lock.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * Distributed reentrant lock provides functionality similar to {@code 
java.util.concurrent.ReentrantLock}.
+ * <h1 class="header">Creating Distributed ReentrantLock</h1>
+ * Instance of cache reentrant lock can be created by calling the following 
method:
+ * {@link Ignite#reentrantLock(String, boolean, boolean)}.
+ */
+public interface IgniteReentrantLock extends Closeable {
+
+    /**
+     * Name of atomic reentrant lock.
+     *
+     * @return Name of atomic reentrant lock.
+     */
+    public String name();
+
+    /**
+     * Acquires the lock.
+     *
+     * <p>Acquires the lock if it is not held by another thread and returns
+     * immediately, setting the lock hold count to one.
+     *
+     * <p>If the current thread already holds the lock then the hold
+     * count is incremented by one and the method returns immediately.
+     *
+     * <p>If the lock is held by another thread then the
+     * current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until the lock has been acquired,
+     * at which time the lock hold count is set to one.
+     */
+    public void lock();
+
+    /**
+     * Acquires the lock unless the current thread is
+     * {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the lock if it is not held by another thread and returns
+     * immediately, setting the lock hold count to one.
+     *
+     * <p>If the current thread already holds this lock then the hold count
+     * is incremented by one and the method returns immediately.
+     *
+     * <p>If the lock is held by another thread then the
+     * current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of two things happens:
+     *
+     * <ul>
+     *
+     * <li>The lock is acquired by the current thread; or
+     *
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread.
+     *
+     * </ul>
+     *
+     * <p>If the lock is acquired by the current thread then the lock hold
+     * count is set to one.
+     *
+     * <p>If the current thread:
+     *
+     * <ul>
+     *
+     * <li>has its interrupted status set on entry to this method; or
+     *
+     * <li>is {@linkplain Thread#interrupt interrupted} while acquiring
+     * the lock,
+     *
+     * </ul>
+     *
+     * then {@link IgniteInterruptedException} is thrown and the current 
thread's
+     * interrupted status is cleared.
+     *
+     * <p>In this implementation, as this method is an explicit
+     * interruption point, preference is given to responding to the
+     * interrupt over normal or reentrant acquisition of the lock.
+     *
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     */
+    public void lockInterruptibly() throws IgniteInterruptedException;
+
+    /**
+     * Acquires the lock only if it is not held by another thread at the time
+     * of invocation.
+     *
+     * <p>Acquires the lock if it is not held by another thread and
+     * returns immediately with the value {@code true}, setting the
+     * lock hold count to one.
+     *
+     * <p>If the current thread already holds this lock then the hold
+     * count is incremented by one and the method returns {@code true}.
+     *
+     * <p>If the lock is held by another thread then this method will return
+     * immediately with the value {@code false}.
+     *
+     * @return {@code true} if the lock was free and was acquired by the
+     *         current thread, or the lock was already held by the current
+     *         thread; and {@code false} otherwise
+     */
+    public boolean tryLock();
+
+    /**
+     * Acquires the lock if it is not held by another thread within the given
+     * waiting time and the current thread has not been
+     * {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the lock if it is not held by another thread and returns
+     * immediately with the value {@code true}, setting the lock hold count
+     * to one.
+     *
+     * <p>If the current thread
+     * already holds this lock then the hold count is incremented by one and
+     * the method returns {@code true}.
+     *
+     * <p>If the lock is held by another thread then the
+     * current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of three things happens:
+     *
+     * <ul>
+     *
+     * <li>The lock is acquired by the current thread; or
+     *
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     *
+     * <li>The specified waiting time elapses
+     *
+     * </ul>
+     *
+     * <p>If the lock is acquired then the value {@code true} is returned and
+     * the lock hold count is set to one.
+     *
+     * <p>If the current thread:
+     *
+     * <ul>
+     *
+     * <li>has its interrupted status set on entry to this method; or
+     *
+     * <li>is {@linkplain Thread#interrupt interrupted} while
+     * acquiring the lock,
+     *
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current 
thread's
+     * interrupted status is cleared.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false}
+     * is returned.  If the time is less than or equal to zero, the method
+     * will not wait at all.
+     *
+     * <p>In this implementation, as this method is an explicit
+     * interruption point, preference is given to responding to the
+     * interrupt over normal or reentrant acquisition of the lock, and
+     * over reporting the elapse of the waiting time.
+     *
+     * @param timeout the time to wait for the lock
+     * @param unit the time unit of the timeout argument
+     * @return {@code true} if the lock was free and was acquired by the
+     *         current thread, or the lock was already held by the current
+     *         thread; and {@code false} if the waiting time elapsed before
+     *         the lock could be acquired
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws NullPointerException if the time unit is null
+     */
+    public boolean tryLock(long timeout, TimeUnit unit) throws 
IgniteInterruptedException;
+
+    /**
+     * Attempts to release this lock.
+     *
+     * <p>If the current thread is the holder of this lock then the hold
+     * count is decremented.  If the hold count is now zero then the lock
+     * is released.  If the current thread is not the holder of this
+     * lock then {@link IllegalMonitorStateException} is thrown.
+     *
+     * @throws IllegalMonitorStateException if the current thread does not
+     *         hold this lock
+     */
+    public void unlock();
+
+    /**
+     * Returns a {@link Condition} instance for use with this
+     * {@link Lock} instance.
+     *
+     * <p>The returned {@link Condition} instance supports the same
+     * usages as do the {@link Object} monitor methods ({@link
+     * Object#wait() wait}, {@link Object#notify notify}, and {@link
+     * Object#notifyAll notifyAll}) when used with the built-in
+     * monitor lock.
+     *
+     * <ul>
+     *
+     * <li>If this lock is not held when any of the {@link Condition}
+     * {@linkplain Condition#await() waiting} or {@linkplain
+     * Condition#signal signalling} methods are called, then an {@link
+     * IllegalMonitorStateException} is thrown.
+     *
+     * <li>When the condition {@linkplain Condition#await() waiting}
+     * methods are called the lock is released and, before they
+     * return, the lock is reacquired and the lock hold count restored
+     * to what it was when the method was called.
+     *
+     * <li>If a thread is {@linkplain Thread#interrupt interrupted}
+     * while waiting then the wait will terminate, an {@link
+     * IgniteInterruptedException} will be thrown, and the thread's
+     * interrupted status will be cleared.
+     *
+     * <li> Waiting threads are signalled in FIFO order.
+     *
+     * </ul>
+     *
+     * @return the Condition object
+     */
+    public IgniteCondition newCondition(String name);
+
+    /**
+     * Queries the number of holds on this lock by the current thread.
+     *
+     * <p>A thread has a hold on a lock for each lock action that is not
+     * matched by an unlock action.
+     *
+     * <p>The hold count information is typically only used for testing and
+     * debugging purposes. For example, if a certain section of code should
+     * not be entered with the lock already held then we can assert that
+     * fact:
+     *
+     *  <pre> {@code
+     * class X {
+     *   ReentrantLock lock = new ReentrantLock();
+     *   // ...
+     *   public void m() {
+     *     assert lock.getHoldCount() == 0;
+     *     lock.lock();
+     *     try {
+     *       // ... method body
+     *     } finally {
+     *       lock.unlock();
+     *     }
+     *   }
+     * }}</pre>
+     *
+     * @return the number of holds on this lock by the current thread,
+     *         or zero if this lock is not held by the current thread
+     */
+    public int getHoldCount();
+
+    /**
+     * Queries if this lock is held by the current thread.
+     *
+     * <p>Analogous to the {@link Thread#holdsLock(Object)} method for
+     * built-in monitor locks, this method is typically used for
+     * debugging and testing. For example, a method that should only be
+     * called while a lock is held can assert that this is the case:
+     *
+     *  <pre> {@code
+     * class X {
+     *   ReentrantLock lock = new ReentrantLock();
+     *   // ...
+     *
+     *   public void m() {
+     *       assert lock.isHeldByCurrentThread();
+     *       // ... method body
+     *   }
+     * }}</pre>
+     *
+     * <p>It can also be used to ensure that a reentrant lock is used
+     * in a non-reentrant manner, for example:
+     *
+     *  <pre> {@code
+     * class X {
+     *   ReentrantLock lock = new ReentrantLock();
+     *   // ...
+     *
+     *   public void m() {
+     *       assert !lock.isHeldByCurrentThread();
+     *       lock.lock();
+     *       try {
+     *           // ... method body
+     *       } finally {
+     *           lock.unlock();
+     *       }
+     *   }
+     * }}</pre>
+     *
+     * @return {@code true} if current thread holds this lock and
+     *         {@code false} otherwise
+     */
+    public boolean isHeldByCurrentThread();
+
+    /**
+     * Queries if this lock is held by any thread. This method is
+     * designed for use in monitoring of the system state,
+     * not for synchronization control.
+     *
+     * @return {@code true} if any thread holds this lock and
+     *         {@code false} otherwise
+     */
+    public boolean isLocked();
+
+    /**
+     * Queries whether any threads are waiting to acquire this lock. Note that
+     * because cancellations may occur at any time, a {@code true}
+     * return does not guarantee that any other thread will ever
+     * acquire this lock.  This method is designed primarily for use in
+     * monitoring of the system state.
+     *
+     * @return {@code true} if there may be other threads waiting to
+     *         acquire the lock
+     */
+    public boolean hasQueuedThreads();
+
+    /**
+     * Queries whether the given thread is waiting to acquire this
+     * lock. Note that because cancellations may occur at any time, a
+     * {@code true} return does not guarantee that this thread
+     * will ever acquire this lock.  This method is designed primarily for use
+     * in monitoring of the system state.
+     *
+     * @param thread the thread
+     * @return {@code true} if the given thread is queued waiting for this lock
+     * @throws NullPointerException if the thread is null
+     */
+    public boolean hasQueuedThread(Thread thread);
+
+    /**
+     * Returns an estimate of the number of nodes waiting to
+     * acquire this lock.  The value is only an estimate because the number of
+     * nodes may change dynamically while this method traverses
+     * internal data structures.  This method is designed for use in
+     * monitoring of the system state, not for synchronization
+     * control.
+     *
+     * @return the estimated number of nodes waiting for this lock
+     */
+    public int getQueueLength();
+
+    /**
+     * Queries whether any threads on this node are waiting on the given 
condition
+     * associated with this lock. Note that because timeouts and
+     * interrupts may occur at any time, a {@code true} return does
+     * not guarantee that a future {@code signal} will awaken any
+     * threads.  This method is designed primarily for use in
+     * monitoring of the system state.
+     *
+     * @param condition the condition
+     * @return {@code true} if there are any waiting threads on this node
+     * @throws IllegalMonitorStateException if this lock is not held
+     * @throws IllegalArgumentException if the given condition is
+     *         not associated with this lock
+     * @throws NullPointerException if the condition is null
+     */
+    public boolean hasWaiters(IgniteCondition condition);
+
+    /**
+     * Returns an estimate of the number of threads on this node are waiting 
on the
+     * given condition associated with this lock. Note that because
+     * timeouts and interrupts may occur at any time, the estimate
+     * serves only as an upper bound on the actual number of waiters.
+     * This method is designed for use in monitoring of the system
+     * state, not for synchronization control.
+     *
+     * @param condition the condition
+     * @return the estimated number of waiting threads on this node
+     * @throws IgniteIllegalStateException if this lock is not held
+     * @throws IllegalArgumentException if the given condition is
+     *         not associated with this lock
+     * @throws NullPointerException if the condition is null
+     */
+    public int getWaitQueueLength(IgniteCondition condition);
+
+    /**
+     * Returns {@code true} if this lock is safe to use after node failure.
+     * If not, IgniteInterruptedException is thrown on every other node after 
node failure.
+     *
+     * @return {@code true} if this reentrant lock has failoverSafe set true
+     */
+    public boolean isFailoverSafe();
+
+    /**
+     * Returns true if any node that owned the locked failed before releasing 
the lock..
+     *
+     * @return true if any node failed while owning the lock.
+     */
+    public boolean isBroken();
+
+    /**
+     * Returns a string identifying this lock, as well as its lock state.
+     * The state, in brackets, includes either the String {@code "Unlocked"}
+     * or the String {@code "Locked by"} followed by the
+     * {@linkplain UUID} of the owning node and {@linkplain Thread#getName 
name}
+     * of the owning thread.
+     *
+     * @return a string identifying this lock, as well as its lock state
+     */
+    public String toString();
+
+    /**
+     * Gets status of reentrant lock.
+     *
+     * @return {@code true} if reentrant lock was removed from cache, {@code 
false} in other case.
+     */
+    public boolean removed();
+
+    /**
+     * Removes reentrant lock.
+     *
+     * @throws IgniteException If operation failed.
+     */
+    @Override public void close();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 5d8daf6..21dde23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -64,6 +64,7 @@ import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteReentrantLock;
 import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteServices;
@@ -2993,6 +2994,25 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteReentrantLock reentrantLock(
+        String name,
+        boolean failoverSafe,
+        boolean create
+    ) {
+        guard();
+
+        try {
+            return ctx.dataStructures().reentrantLock(name, failoverSafe, 
create);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 98848ee..73aaddf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -45,6 +45,7 @@ import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteReentrantLock;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -95,6 +96,7 @@ import static 
org.apache.ignite.internal.processors.datastructures.DataStructure
 import static 
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
 import static 
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
 import static 
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
+import static 
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.REENTRANT_LOCK;
 import static 
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
 import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -135,6 +137,9 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
     /** Cache contains only {@code GridCacheSemaphoreState}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> 
semView;
 
+    /** Cache contains only {@code GridCacheReentrantLockState}. */
+    private IgniteInternalCache<GridCacheInternalKey, 
GridCacheReentrantLockState> reentrantLockView;
+
     /** Cache contains only {@code GridCacheAtomicReferenceValue}. */
     private IgniteInternalCache<GridCacheInternalKey, 
GridCacheAtomicReferenceValue> atomicRefView;
 
@@ -177,7 +182,7 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         ctx.event().addLocalEventListener(
             new GridLocalEventListener() {
                 @Override public void onEvent(final Event evt) {
-                    // This may require cache operation to exectue,
+                    // This may require cache operation to execute,
                     // therefore cannot use event notification thread.
                     ctx.closure().callLocalSafe(
                         new Callable<Object>() {
@@ -189,6 +194,8 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
                                 for (GridCacheRemovable ds : dsMap.values()) {
                                     if (ds instanceof GridCacheSemaphoreEx)
                                         
((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId);
+                                    else if(ds instanceof 
GridCacheReentrantLockEx)
+                                        
((GridCacheReentrantLockEx)ds).onNodeRemoved(leftNodeId);
                                 }
 
                                 return null;
@@ -224,6 +231,8 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
             semView = atomicsCache;
 
+            reentrantLockView = atomicsCache;
+
             atomicLongView = atomicsCache;
 
             atomicRefView = atomicsCache;
@@ -1183,12 +1192,12 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
                 try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     // Check correctness type of removable object.
                     GridCacheCountDownLatchValue val =
-                            cast(dsView.get(key), 
GridCacheCountDownLatchValue.class);
+                        cast(dsView.get(key), 
GridCacheCountDownLatchValue.class);
 
                     if (val != null) {
                         if (val.get() > 0) {
                             throw new IgniteCheckedException("Failed to remove 
count down latch " +
-                                    "with non-zero count: " + val.get());
+                                "with non-zero count: " + val.get());
                         }
 
                         dsView.remove(key);
@@ -1326,6 +1335,123 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Gets or creates reentrant lock. If reentrant lock is not found in cache,
+     * it is created using provided name and count parameter.
+     *
+     * @param name Name of the reentrant lock.
+     * @param create If {@code true} reentrant lock will be created in case it 
is not in cache.
+     * @return ReentrantLock for the given name or {@code null} if it is not 
found and
+     *      {@code create} is false.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public IgniteReentrantLock reentrantLock(final String name, final boolean 
failoverSafe, final boolean create)
+        throws IgniteCheckedException {
+        A.notNull(name, "name");
+
+        awaitInitialization();
+
+        checkAtomicsConfiguration();
+
+        startQuery();
+
+        return getAtomic(new IgniteOutClosureX<IgniteReentrantLock>() {
+            @Override public IgniteReentrantLock applyx() throws 
IgniteCheckedException {
+                GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheReentrantLockState val = cast(dsView.get(key), 
GridCacheReentrantLockState.class);
+
+                    // Check that reentrant lock hasn't been created in other 
thread yet.
+                    GridCacheReentrantLockEx reentrantLock = 
cast(dsMap.get(key), GridCacheReentrantLockEx.class);
+
+                    if (reentrantLock != null) {
+                        assert val != null;
+
+                        return reentrantLock;
+                    }
+
+                    if (val == null && !create)
+                        return null;
+
+                    if (val == null) {
+                        val = new GridCacheReentrantLockState(0, 
dsCacheCtx.nodeId(), 0, failoverSafe);
+
+                        dsView.put(key, val);
+                    }
+
+                    GridCacheReentrantLockEx reentrantLock0 = new 
GridCacheReentrantLockImpl(
+                        name,
+                        key,
+                        reentrantLockView,
+                        dsCacheCtx);
+
+                    dsMap.put(key, reentrantLock0);
+
+                    tx.commit();
+
+                    return reentrantLock0;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to create reentrant lock: " + name, 
e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, REENTRANT_LOCK, null), create, 
GridCacheReentrantLockEx.class);
+    }
+
+    /**
+     * Removes reentrant lock from cache.
+     *
+     * @param name Name of the reentrant lock.
+     * @param broken Flag indicating the reentrant lock is broken and should 
be removed unconditionally.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public void removeReentrantLock(final String name, boolean broken) throws 
IgniteCheckedException {
+        assert name != null;
+        assert dsCacheCtx != null;
+
+        awaitInitialization();
+
+        removeDataStructure(new IgniteOutClosureX<Void>() {
+            @Override public Void applyx() throws IgniteCheckedException {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    // Check correctness type of removable object.
+                    GridCacheReentrantLockState val = cast(dsView.get(key), 
GridCacheReentrantLockState.class);
+
+                    if (val != null) {
+                        if (val.get() > 0 && !broken)
+                            throw new IgniteCheckedException("Failed to remove 
reentrant lock with blocked threads. ");
+
+                        dsView.remove(key);
+
+                        tx.commit();
+                    }
+                    else
+                        tx.setRollbackOnly();
+
+                    return null;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, name, REENTRANT_LOCK, null);
+    }
+
+
+    /**
      * Remove internal entry by key from cache.
      *
      * @param key Internal entry key.
@@ -1373,7 +1499,8 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws 
CacheEntryListenerException {
             if (evt.getEventType() == EventType.CREATED || evt.getEventType() 
== EventType.UPDATED)
                 return evt.getValue() instanceof GridCacheCountDownLatchValue 
||
-                    evt.getValue() instanceof GridCacheSemaphoreState;
+                    evt.getValue() instanceof GridCacheSemaphoreState ||
+                    evt.getValue() instanceof GridCacheReentrantLockState;
             else {
                 assert evt.getEventType() == EventType.REMOVED : evt;
 
@@ -1470,6 +1597,25 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
                                     ", actual=" + sem.getClass() + ", value=" 
+ sem + ']');
                         }
                     }
+                    else if (val0 instanceof GridCacheReentrantLockState) {
+                        GridCacheInternalKey key = evt.getKey();
+
+                        // Notify reentrant lock on changes.
+                        final GridCacheRemovable reentrantLock = 
dsMap.get(key);
+
+                        GridCacheReentrantLockState val = 
(GridCacheReentrantLockState)val0;
+
+                        if (reentrantLock instanceof GridCacheReentrantLockEx) 
{
+                            final GridCacheReentrantLockEx lock0 = 
(GridCacheReentrantLockEx)reentrantLock;
+
+                            lock0.onUpdate(val);
+                        }
+                        else if (reentrantLock != null) {
+                            U.error(log, "Failed to cast object " +
+                                "[expected=" + 
IgniteReentrantLock.class.getSimpleName() +
+                                ", actual=" + reentrantLock.getClass() + ", 
value=" + reentrantLock + ']');
+                        }
+                    }
 
                 }
                 else {
@@ -1688,7 +1834,10 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         SET(IgniteSet.class.getSimpleName()),
 
         /** */
-        SEMAPHORE(IgniteSemaphore.class.getSimpleName());
+        SEMAPHORE(IgniteSemaphore.class.getSimpleName()),
+
+        /** */
+        REENTRANT_LOCK(IgniteReentrantLock.class.getSimpleName());
 
         /** */
         private static final DataStructureType[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java
new file mode 100644
index 0000000..4a5238b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteReentrantLock;
+
+/**
+ * Grid cache reentrant lock ({@code 'Ex'} stands for external).
+ */
+public interface GridCacheReentrantLockEx extends IgniteReentrantLock, 
GridCacheRemovable {
+    /**
+     * Get current reentrant lock latch key.
+     *
+     * @return Lock key.
+     */
+    public GridCacheInternalKey key();
+
+    /**
+     * Callback to notify reentrant lock on changes.
+     *
+     * @param state New reentrant lock state.
+     */
+    public void onUpdate(GridCacheReentrantLockState state);
+
+    /**
+     * Callback to notify semaphore on topology changes.
+     *
+     * @param nodeId Id of the node that left the grid.
+     */
+    public void onNodeRemoved(UUID nodeId);
+}

Reply via email to