IGNITE-642 Implement IgniteReentrantLock data structure
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f8bf93a7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f8bf93a7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f8bf93a7 Branch: refs/heads/ignite-2788 Commit: f8bf93a79d74231f720972ba7fbaf113d0060e2d Parents: 0f6c97e Author: Vladisav Jelisavcic <[email protected]> Authored: Wed Apr 27 18:31:26 2016 +0300 Committer: shtykh_roman <[email protected]> Committed: Fri May 13 16:11:16 2016 +0900 ---------------------------------------------------------------------- .../datastructures/IgniteLockExample.java | 293 ++++ .../ignite/examples/CacheExamplesSelfTest.java | 8 + .../src/main/java/org/apache/ignite/Ignite.java | 17 + .../java/org/apache/ignite/IgniteCondition.java | 338 ++++ .../main/java/org/apache/ignite/IgniteLock.java | 489 ++++++ .../ignite/internal/GridKernalContextImpl.java | 4 +- .../apache/ignite/internal/IgniteKernal.java | 21 + .../datastructures/DataStructuresProcessor.java | 158 +- .../datastructures/GridCacheLockEx.java | 52 + .../datastructures/GridCacheLockImpl.java | 1538 +++++++++++++++++ .../datastructures/GridCacheLockState.java | 353 ++++ .../datastructures/GridCacheSemaphoreImpl.java | 33 + .../resources/META-INF/classnames.properties | 2 + .../IgniteClientReconnectAtomicsTest.java | 58 + ...eAbstractDataStructuresFailoverSelfTest.java | 208 ++- .../IgniteClientDataStructuresAbstractTest.java | 70 + .../IgniteDataStructureUniqueNameTest.java | 13 +- .../IgniteLockAbstractSelfTest.java | 1629 ++++++++++++++++++ .../IgniteSemaphoreAbstractSelfTest.java | 31 + .../local/IgniteLocalLockSelfTest.java | 110 ++ .../IgnitePartitionedLockSelfTest.java | 33 + .../IgniteReplicatedLockSelfTest.java | 33 + .../distributed/GridCacheLockAbstractTest.java | 2 +- .../cache/GridCacheDataStructuresLoadTest.java | 53 + .../ignite/testframework/junits/IgniteMock.java | 10 + .../junits/multijvm/IgniteProcessProxy.java | 7 + .../IgniteCacheDataStructuresSelfTestSuite.java | 6 + .../org/apache/ignite/IgniteSpringBean.java | 11 + 28 files changed, 5570 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java new file mode 100644 index 0000000..1f84787 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datastructures; + +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteLock; +import org.apache.ignite.Ignition; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.lang.IgniteRunnable; + +/** + * This example demonstrates cache based reentrant lock. + * <p> + * Remote nodes should always be started with special configuration + * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code + * examples/config/example-ignite.xml} configuration. + */ +public class IgniteLockExample { + /** Number of items for each producer/consumer to produce/consume. */ + private static final int OPS_COUNT = 100; + + /** Number of producers. */ + private static final int NUM_PRODUCERS = 5; + + /** Number of consumers. */ + private static final int NUM_CONSUMERS = 5; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Name of the global resource. */ + private static final String QUEUE_ID = "queue"; + + /** Name of the synchronization variable. */ + private static final String SYNC_NAME = "done"; + + /** Name of the condition object. */ + private static final String NOT_FULL = "notFull"; + + /** Name of the condition object. */ + private static final String NOT_EMPTY = "notEmpty"; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + */ + public static void main(String[] args) { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Cache atomic reentrant lock example started."); + + // Make name of reentrant lock. + final String reentrantLockName = UUID.randomUUID().toString(); + + // Initialize lock. + IgniteLock lock = ignite.reentrantLock(reentrantLockName, true, false, true); + + // Init distributed cache. + IgniteCache<String, Integer> cache = ignite.getOrCreateCache(CACHE_NAME); + + // Init shared variable. + cache.put(QUEUE_ID, 0); + + // Shared variable indicating number of jobs left to be completed. + cache.put(SYNC_NAME, NUM_PRODUCERS + NUM_CONSUMERS); + + // Start consumers on all cluster nodes. + for (int i = 0; i < NUM_CONSUMERS; i++) + ignite.compute().withAsync().run(new Consumer(reentrantLockName)); + + // Start producers on all cluster nodes. + for (int i = 0; i < NUM_PRODUCERS; i++) + ignite.compute().withAsync().run(new Producer(reentrantLockName)); + + System.out.println("Master node is waiting for all other nodes to finish..."); + + // Wait for everyone to finish. + try { + lock.lock(); + + IgniteCondition notDone = lock.getOrCreateCondition(SYNC_NAME); + + int count = cache.get(SYNC_NAME); + + while(count > 0) { + notDone.await(); + + count = cache.get(SYNC_NAME); + } + } + finally { + lock.unlock(); + } + } + + System.out.flush(); + System.out.println(); + System.out.println("Finished reentrant lock example..."); + System.out.println("Check all nodes for output (this node is also part of the cluster)."); + } + + /** + * Closure which simply acquires reentrant lock. + */ + private abstract static class ReentrantLockExampleClosure implements IgniteRunnable { + /** Semaphore name. */ + protected final String reentrantLockName; + + /** + * @param reentrantLockName Reentrant lock name. + */ + ReentrantLockExampleClosure(String reentrantLockName) { + this.reentrantLockName = reentrantLockName; + } + } + + /** + * Closure which simulates producer. + */ + private static class Producer extends ReentrantLockExampleClosure { + /** + * @param reentrantLockName Reentrant lock name. + */ + public Producer(String reentrantLockName) { + super(reentrantLockName); + } + + /** {@inheritDoc} */ + @Override public void run() { + System.out.println("Producer started. "); + + IgniteLock lock = Ignition.ignite().reentrantLock(reentrantLockName, true, false, true); + + // Condition to wait on when queue is full. + IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL); + + // Signaled to wait on when queue is empty. + IgniteCondition notEmpty = lock.getOrCreateCondition(NOT_EMPTY); + + // Signaled when job is done. + IgniteCondition done = lock.getOrCreateCondition(SYNC_NAME); + + IgniteCache<String, Integer> cache = Ignition.ignite().cache(CACHE_NAME); + + for (int i = 0; i < OPS_COUNT; i++) { + try { + lock.lock(); + + int val = cache.get(QUEUE_ID); + + while(val >= 100){ + System.out.println("Queue is full. Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + " paused."); + + notFull.await(); + + val = cache.get(QUEUE_ID); + } + + val++; + + System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + ", available=" + val + ']'); + + cache.put(QUEUE_ID, val); + + notEmpty.signalAll(); + } + finally { + lock.unlock(); + } + } + + System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); + + try { + lock.lock(); + + int count = cache.get(SYNC_NAME); + + count--; + + cache.put(SYNC_NAME, count); + + // Signals the master thread. + done.signal(); + } + finally { + lock.unlock(); + } + } + } + + /** + * Closure which simulates consumer. + */ + private static class Consumer extends ReentrantLockExampleClosure { + /** + * @param reentrantLockName ReentrantLock name. + */ + public Consumer(String reentrantLockName) { + super(reentrantLockName); + } + + /** {@inheritDoc} */ + @Override public void run() { + System.out.println("Consumer started. "); + + Ignite g = Ignition.ignite(); + + IgniteLock lock = g.reentrantLock(reentrantLockName, true, false, true); + + // Condition to wait on when queue is full. + IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL); + + // Signaled to wait on when queue is empty. + IgniteCondition notEmpty = lock.getOrCreateCondition(NOT_EMPTY); + + // Signaled when job is done. + IgniteCondition done = lock.getOrCreateCondition(SYNC_NAME); + + IgniteCache<String, Integer> cache = g.cache(CACHE_NAME); + + for (int i = 0; i < OPS_COUNT; i++) { + try { + lock.lock(); + + int val = cache.get(QUEUE_ID); + + while (val <= 0) { + System.out.println("Queue is empty. Consumer [nodeId=" + + Ignition.ignite().cluster().localNode().id() + " paused."); + + notEmpty.await(); + + val = cache.get(QUEUE_ID); + } + + val--; + + System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + ", available=" + val + ']'); + + cache.put(QUEUE_ID, val); + + notFull.signalAll(); + } + finally { + lock.unlock(); + } + } + + System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); + + try { + lock.lock(); + + int count = cache.get(SYNC_NAME); + + count--; + + cache.put(SYNC_NAME, count); + + // Signals the master thread. + done.signal(); + } + finally { + lock.unlock(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java index 541291b..43b05b5 100644 --- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java +++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.examples.datastructures.IgniteAtomicSequenceExample; import org.apache.ignite.examples.datastructures.IgniteAtomicStampedExample; import org.apache.ignite.examples.datastructures.IgniteCountDownLatchExample; import org.apache.ignite.examples.datastructures.IgniteQueueExample; +import org.apache.ignite.examples.datastructures.IgniteLockExample; import org.apache.ignite.examples.datastructures.IgniteSemaphoreExample; import org.apache.ignite.examples.datastructures.IgniteSetExample; import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest; @@ -100,6 +101,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest { /** * @throws Exception If failed. */ + public void testCacheLockExample() throws Exception { + IgniteLockExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ public void testCacheQueueExample() throws Exception { IgniteQueueExample.main(EMPTY_ARGS); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 5703744..b62672e 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -466,6 +466,23 @@ public interface Ignite extends AutoCloseable { throws IgniteException; /** + * Gets or creates reentrant lock. If reentrant lock is not found in cache and {@code create} flag + * is {@code true}, it is created using provided name. + * + * @param name Name of the lock. + * @param failoverSafe {@code True} to create failover safe lock which means that + * if any node leaves topology, all locks already acquired by that node are silently released + * and become available for other nodes to acquire. If flag is {@code false} then + * all threads on other nodes waiting to acquire lock are interrupted. + * @param fair If {@code True}, fair lock will be created. + * @param create Boolean flag indicating whether data structure should be created if does not exist. + * @return ReentrantLock for the given name. + * @throws IgniteException If reentrant lock could not be fetched or created. + */ + public IgniteLock reentrantLock(String name, boolean failoverSafe, boolean fair, boolean create) + throws IgniteException; + + /** * Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not * {@code null}. * If queue is present already, queue properties will not be changed. Use http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java new file mode 100644 index 0000000..020f23a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; + +/** + * This interface provides a rich API for working with condition objects + * associated with distributed reentrant locks. + * <p> + * <h1 class="header">Functionality</h1> + * IgniteCondition provides functionality similar to {@code java.util.concurrent.locks.Condition}. + */ +public interface IgniteCondition extends Condition { + /** + * Name of ignite condition. + * + * @return Name of ignite condition. + */ + public String name(); + + /** + * Causes the current thread to wait until it is signalled or + * {@linkplain Thread#interrupt interrupted}. + * + * <p>The lock associated with this {@code IgniteCondition} is atomically + * released and the current thread becomes disabled for thread scheduling + * purposes and lies dormant until <em>one</em> of six things happens: + * <ul> + * <li>Some other thread (on any node) invokes the {@link #signal} method for this + * {@code Condition} and the current thread happens to be chosen as the + * thread to be awakened; or + * <li>Some other thread (on any node) invokes the {@link #signalAll} method for this + * {@code Condition}; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread, and interruption of thread suspension is supported; or + * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or + * <li>Local node is stopped; or + * <li>A "<em>spurious wakeup</em>" occurs. + * </ul> + * + * <p>If lock is not broken (because of failure of lock owner node) + * in non-failoverSafe mode and local node is alive, + * before this method can return the current thread must + * re-acquire the lock associated with this condition. In all other cases + * when the thread returns it is <em>guaranteed</em> to hold this lock. + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting + * and interruption of thread suspension is supported, + * </ul> + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. It is not specified, in the first + * case, whether or not the test for interruption occurs before the lock + * is released. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. + * + * @throws IgniteInterruptedException if the current thread is interrupted + * @throws IgniteException if the node stopped, or + * node owning the lock failed in non-failoversafe mode + */ + @Override void await() throws IgniteInterruptedException, IgniteException; + + /** + * Causes the current thread to wait until it is signalled. + * + * <p>The lock associated with this condition is atomically + * released and the current thread becomes disabled for thread scheduling + * purposes and lies dormant until <em>one</em> of five things happens: + * <ul> + * <li>Some other thread invokes the {@link #signal} method for this + * {@code Condition} and the current thread happens to be chosen as the + * thread to be awakened; or + * <li>Some other thread invokes the {@link #signalAll} method for this + * {@code Condition}; or + * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or + * <li>Local node is stopped; or + * <li>A "<em>spurious wakeup</em>" occurs. + * </ul> + * + * <p>If lock is not broken (because of failure of lock owner node) + * in non-failoverSafe mode and local node is alive, + * before this method can return the current thread must + * re-acquire the lock associated with this condition. In all other cases, + * when the thread returns it is <em>guaranteed</em> to hold this lock. + * + * <p>If the current thread's interrupted status is set when it enters + * this method, or it is {@linkplain Thread#interrupt interrupted} + * while waiting, it will continue to wait until signalled. When it finally + * returns from this method its interrupted status will still + * be set. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. + * + * @throws IgniteException if the node stopped, or + * node owning the lock failed in non-failoversafe mode + */ + @Override void awaitUninterruptibly() throws IgniteException; + + /** + * Causes the current thread to wait until it is signalled or interrupted, + * or the specified waiting time elapses. + * + * <p>The lock associated with this condition is atomically + * released and the current thread becomes disabled for thread scheduling + * purposes and lies dormant until <em>one</em> of seven things happens: + * <ul> + * <li>Some other thread invokes the {@link #signal} method for this + * {@code Condition} and the current thread happens to be chosen as the + * thread to be awakened; or + * <li>Some other thread invokes the {@link #signalAll} method for this + * {@code Condition}; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread, and interruption of thread suspension is supported; or + * <li>The specified waiting time elapses; or + * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or + * <li>Local node is stopped; or + * <li>A "<em>spurious wakeup</em>" occurs. + * </ul> + * + * <p>If lock is not broken (because of failure of lock owner node) + * in non-failoverSafe mode and local node is alive, + * before this method can return the current thread must + * re-acquire the lock associated with this condition. When the + * thread returns it is <em>guaranteed</em> to hold this lock. + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting + * and interruption of thread suspension is supported, + * </ul> + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. It is not specified, in the first + * case, whether or not the test for interruption occurs before the lock + * is released. + * + * <p>The method returns an estimate of the number of nanoseconds + * remaining to wait given the supplied {@code nanosTimeout} + * value upon return, or a value less than or equal to zero if it + * timed out. This value can be used to determine whether and how + * long to re-wait in cases where the wait returns but an awaited + * condition still does not hold. Typical uses of this method take + * the following form: + * + * <pre> {@code + * boolean aMethod(long timeout, TimeUnit unit) { + * long nanos = unit.toNanos(timeout); + * lock.lock(); + * try { + * while (!conditionBeingWaitedFor()) { + * if (nanos <= 0L) + * return false; + * nanos = theCondition.awaitNanos(nanos); + * } + * // ... + * } finally { + * lock.unlock(); + * } + * }}</pre> + * + * <p>Design note: This method requires a nanosecond argument so + * as to avoid truncation errors in reporting remaining times. + * Such precision loss would make it difficult for programmers to + * ensure that total waiting times are not systematically shorter + * than specified when re-waits occur. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. + * + * @param nanosTimeout the maximum time to wait, in nanoseconds + * @return an estimate of the {@code nanosTimeout} value minus + * the time spent waiting upon return from this method. + * A positive value may be used as the argument to a + * subsequent call to this method to finish waiting out + * the desired time. A value less than or equal to zero + * indicates that no time remains. + * @throws IgniteInterruptedException if the current thread is interrupted + * @throws IgniteException if the node stopped, or + * node owning the lock failed in non-failoversafe mode + */ + @Override long awaitNanos(long nanosTimeout) throws IgniteInterruptedException, IgniteException; + + /** + * Causes the current thread to wait until it is signalled or interrupted, + * or the specified waiting time elapses. This method is behaviorally + * equivalent to: + * <pre> {@code awaitNanos(unit.toNanos(time)) > 0}</pre> + * + * @param time the maximum time to wait + * @param unit the time unit of the {@code time} argument + * @return {@code false} if the waiting time detectably elapsed + * before return from the method, else {@code true} + * @throws IgniteInterruptedException if the current thread is interrupted + * @throws IgniteException if the node stopped, or + * node owning the lock failed in non-failoversafe mode + */ + @Override boolean await(long time, TimeUnit unit) throws IgniteInterruptedException, IgniteException; + + /** + * Causes the current thread to wait until it is signalled or interrupted, + * or the specified deadline elapses. + * + * <p>The lock associated with this condition is atomically + * released and the current thread becomes disabled for thread scheduling + * purposes and lies dormant until <em>one</em> of seven things happens: + * <ul> + * <li>Some other thread invokes the {@link #signal} method for this + * {@code Condition} and the current thread happens to be chosen as the + * thread to be awakened; or + * <li>Some other thread invokes the {@link #signalAll} method for this + * {@code Condition}; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread, and interruption of thread suspension is supported; or + * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or + * <li>Local node is stopped; or + * <li>The specified deadline elapses; or + * <li>A "<em>spurious wakeup</em>" occurs. + * </ul> + * + * <p>If lock is not broken (because of failure of lock owner node) + * in non-failoverSafe mode and local node is alive, + * before this method can return the current thread must + * re-acquire the lock associated with this condition. When the + * thread returns it is <em>guaranteed</em> to hold this lock. + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting + * and interruption of thread suspension is supported, + * </ul> + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. It is not specified, in the first + * case, whether or not the test for interruption occurs before the lock + * is released. + * + * <p>The return value indicates whether the deadline has elapsed, + * which can be used as follows: + * <pre> {@code + * boolean aMethod(Date deadline) { + * boolean stillWaiting = true; + * lock.lock(); + * try { + * while (!conditionBeingWaitedFor()) { + * if (!stillWaiting) + * return false; + * stillWaiting = theCondition.awaitUntil(deadline); + * } + * // ... + * } finally { + * lock.unlock(); + * } + * }}</pre> + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. + * + * @param deadline the absolute time to wait until + * @return {@code false} if the deadline has elapsed upon return, else + * {@code true} + * @throws IgniteInterruptedException if the current thread is interrupted + * (and interruption of thread suspension is supported) + * @throws IgniteException if the node stopped, or + * node owning the lock failed in non-failoversafe mode + */ + @Override boolean awaitUntil(Date deadline) throws IgniteInterruptedException, IgniteException; + + /** + * Wakes up one waiting thread. + * + * <p>If any threads are waiting on this condition then one + * is selected for waking up. That thread must then re-acquire the + * lock before returning from {@code await}. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. + * + * @throws IgniteException if node is stopped or + * node owning the lock failed in non-failoversafe mode + */ + @Override void signal() throws IgniteException; + + /** + * Wakes up all waiting threads. + * + * <p>If any threads are waiting on this condition then they are + * all woken up. Each thread must re-acquire the lock before it can + * return from {@code await}. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. + * + * @throws IgniteException if node is stopped or + * node owning the lock failed in non-failoversafe mode + */ + @Override void signalAll() throws IgniteException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/IgniteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java new file mode 100644 index 0000000..2b3ad3d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * This interface provides a rich API for working with distributed reentrant locks. + * <p> + * <h1 class="header">Functionality</h1> + * Distributed reentrant lock provides functionality similar to {@code java.util.concurrent.ReentrantLock}. + * <h1 class="header">Creating Distributed ReentrantLock</h1> + * Instance of cache reentrant lock can be created by calling the following method: + * {@link Ignite#reentrantLock(String, boolean, boolean, boolean)}. + * <h1 class="header">Protection from failover</h1> + * Ignite lock can automatically recover from node failure. + * <ul> + * <li>If failoverSafe flag is set to true upon creation, + * in case a node owning the lock fails, lock will be automatically released and become available for threads on other + * nodes to acquire. No exception will be thrown. + * <li>If failoverSafe flag is set to false upon creation, + * in case a node owning the lock fails, {@code IgniteException} will be thrown on every other node attempting to + * perform any operation on this lock. No automatic recovery will be attempted, + * and lock will be marked as broken (i.e. unusable), which can be checked using the method #isBroken(). + * Broken lock cannot be reused again. + * </ul> + * + * <h1 class="header">Implementation issues</h1> + * Ignite lock comes in two flavours: fair and non-fair. Non-fair lock assumes no ordering should be imposed + * on acquiring threads; in case of contention, threads from all nodes compete for the lock once the lock is released. + * In most cases this is the desired behaviour. However, in some cases, using the non-fair lock can lead to uneven load + * distribution among nodes. + * Fair lock solves this issue by imposing strict FIFO ordering policy at a cost of an additional transaction. + * This ordering does not guarantee fairness of thread scheduling (similar to {@code java.util.concurrent.ReentrantLock}). + * Thus, one of many threads on any node using a fair lock may obtain it multiple times in succession while other + * active threads are not progressing and not currently holding the lock. Also note that the untimed tryLock method + * does not honor the fairness setting. It will succeed if the lock is available even if other threads are waiting. + * + * </p> + * As a rule of thumb, whenever there is a reasonable time window between successive calls to release and acquire + * the lock, non-fair lock should be preferred: + * + * <pre> {@code + * while(someCondition){ + * // do anything + * lock.lock(); + * try{ + * // ... + * } + * finally { + * lock.unlock(); + * } + * } + * }</pre> + * + * If successive calls to release/acquire are following immediately, + * e.g. + * + * <pre> {@code + * while(someCondition){ + * lock.lock(); + * try { + * // do something + * } + * finally { + * lock.unlock(); + * } + * } + * }</pre> + * + * using the fair lock is reasonable in order to allow even distribution of load among nodes + * (although overall throughput may be lower due to increased overhead). + * + */ +public interface IgniteLock extends Lock, Closeable { + /** + * Name of atomic reentrant lock. + * + * @return Name of atomic reentrant lock. + */ + public String name(); + + /** + * Acquires the distributed reentrant lock. + * + * <p>Acquires the lock if it is not held by another thread and returns + * immediately, setting the lock hold count to one. + * + * <p>If the current thread already holds this lock then the hold count + * is incremented by one and the method returns immediately. + * + * <p>If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of four things happens: + * + * <ul> + * + * <li>The lock is acquired by the current thread; or + * + * <li>Lock is broken (any node failed while owning this lock), and lock is created in + * non-failoverSafe mode. + * + * <li>Local node is stopped. + * + * @throws IgniteException if the node is stopped or broken in non-failoverSafe mode + */ + void lock() throws IgniteException; + + /** + * Acquires the lock unless the current thread is + * {@linkplain Thread#interrupt interrupted}. + * + * <p>Acquires the lock if it is not held by another thread and returns + * immediately, setting the lock hold count to one. + * + * <p>If the current thread already holds this lock then the hold count + * is incremented by one and the method returns immediately. + * + * <p>If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of four things happens: + * + * <ul> + * + * <li>The lock is acquired by the current thread; or + * + * <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread. + * + * <li>Lock is broken (any node failed while owning this lock), and lock is created in + * non-failoverSafe mode. + * + * <li>Local node is stopped. + * + * </ul> + * + * <p>If the lock is acquired by the current thread then the lock hold + * count is set to one. + * + * <p>If the current thread: + * + * <ul> + * + * <li>has its interrupted status set on entry to this method; or + * + * <li>is {@linkplain Thread#interrupt interrupted} while acquiring + * the lock; or + * + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * </ul> + * + * <p>{@link IgniteException} is thrown in case: + * + * <ul> + * + * <li>the lock is broken before or during the attempt to acquire this lock; or + * + * <li>local node is stopped, + * + * </ul> + * + * <p>In this implementation, as this method is an explicit + * interruption point, preference is given to responding to the + * interrupt over normal or reentrant acquisition of the lock. + * + * @throws IgniteInterruptedException if the current thread is interrupted + * @throws IgniteException if the lock is broken in non-failoverSafe mode (any node failed while owning this lock), + * or local node is stopped + */ + @Override public void lockInterruptibly() throws IgniteInterruptedException, IgniteException; + + /** + * Acquires the lock only if it is free at the time of invocation. + * + * <p>Acquires the lock if it is available and returns immediately + * with the value {@code true}. + * If the lock is not available then this method will return + * immediately with the value {@code false}. + * + * <p>A typical usage idiom for this method would be: + * <pre> {@code + * Lock lock = ...; + * if (lock.tryLock()) { + * try { + * // manipulate protected state + * } finally { + * lock.unlock(); + * } + * } else { + * // perform alternative actions + * }}</pre> + * + * This usage ensures that the lock is unlocked if it was acquired, and + * doesn't try to unlock if the lock was not acquired. + * + * If node is stopped, or any node failed while owning the lock in non-failoverSafe mode, + * then {@link IgniteException} is thrown. + * + * @return {@code true} if the lock was acquired and + * {@code false} otherwise + * + * @throws IgniteException if node is stopped, or lock is already broken in non-failover safe mode + */ + @Override public boolean tryLock() throws IgniteException; + + /** + * Acquires the lock if it is not held by another thread within the given + * waiting time and the current thread has not been + * {@linkplain Thread#interrupt interrupted}. + * + * <p>Acquires the lock if it is not held by another thread and returns + * immediately with the value {@code true}, setting the lock hold count + * to one. + * + * <p>If the current thread + * already holds this lock then the hold count is incremented by one and + * the method returns {@code true}. + * + * <p>If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of five things happens: + * + * <ul> + * + * <li>The lock is acquired by the current thread; or + * + * <li>Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + * + * <li>Lock is broken (any node failed while owning this lock), and lock is created in + * non-failoverSafe mode. + * + * <li>Local node is stopped. + * + * <li>The specified waiting time elapses + * + * </ul> + * + * <p>If the lock is acquired then the value {@code true} is returned and + * the lock hold count is set to one. + * + * <p>If the current thread: + * + * <ul> + * + * <li>has its interrupted status set on entry to this method; or + * + * <li>is {@linkplain Thread#interrupt interrupted} while + * acquiring the lock; or + * + * </ul> + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * <p>{@link IgniteException} is thrown in case: + * + * <ul> + * + * <li>the lock is broken before or during the attempt to acquire this lock; or + * + * <li>local node is stopped, + * + * </ul> + * + * <p>If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * <p>In this implementation, as this method is an explicit + * interruption point, preference is given to responding to the + * interrupt over normal or reentrant acquisition of the lock, and + * over reporting the elapse of the waiting time. + * + * @param timeout the time to wait for the lock + * @param unit the time unit of the timeout argument + * @return {@code true} if the lock was free and was acquired by the + * current thread, or the lock was already held by the current + * thread; and {@code false} if the waiting time elapsed before + * the lock could be acquired + * @throws IgniteInterruptedException if the current thread is interrupted + * @throws IgniteException if node is stopped, or lock is already broken in non-failover safe mode + * @throws NullPointerException if the time unit is null + */ + @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException, IgniteException; + + /** + * Releases the lock. + * + * If lock is not owned by current thread, then an {@link + * IllegalMonitorStateException} is thrown. + * If lock is already broken prior to invocation of this method, and + * lock is created in non-failover safe mode, then {@link IgniteException} is thrown. + * + * @throws IllegalMonitorStateException if not owned by current thread + * @throws IgniteException if node is stopped, or lock is already broken in non-failover safe mode + */ + void unlock() throws IgniteInterruptedException; + + /** + * Returns a {@link Condition} instance for use with this + * {@link IgniteLock} instance. + * + * <ul> + * + * <li>If this lock is not held when any of the {@link Condition} + * {@linkplain Condition#await() waiting} or {@linkplain + * Condition#signal signalling} methods are called, then an {@link + * IllegalMonitorStateException} is thrown. + * + * <li>When the condition {@linkplain Condition#await() waiting} + * methods are called the lock is released and, before they + * return, the lock is reacquired and the lock hold count restored + * to what it was when the method was called. + * + * <li>If a thread is {@linkplain Thread#interrupt interrupted} + * while waiting then the wait will terminate, an {@link + * IgniteInterruptedException} will be thrown, and the thread's + * interrupted status will be cleared. + * + * <li> Waiting threads are signalled in FIFO order. + * + * </ul> + * + * @param name Name of the distributed condition object + * + * @return the Condition object + * @throws IgniteException if the lock is not initialized or already removed + */ + public IgniteCondition getOrCreateCondition(String name) throws IgniteException; + + /** + * This method is not supported in IgniteLock, + * Any invocation of this method will result in {@linkplain UnsupportedOperationException}. + * Correct way to obtain Condition object is through method {@linkplain IgniteLock#getOrCreateCondition(String)} + * + */ + @Override public Condition newCondition(); + + /** + * Queries the number of holds on this lock by the current thread. + * + * @return the number of holds on this lock by the current thread, + * or zero if this lock is not held by the current thread + * @throws IgniteException if the lock is not initialized or already removed + */ + public int getHoldCount() throws IgniteException; + + /** + * Queries if this lock is held by the current thread. + * + * @return {@code true} if current thread holds this lock and + * {@code false} otherwise + * @throws IgniteException if the lock is not initialized or already removed + */ + public boolean isHeldByCurrentThread() throws IgniteException; + + /** + * Queries if this lock is held by any thread on any node. This method is + * designed for use in monitoring of the system state, + * not for synchronization control. + * + * @return {@code true} if any thread on this or any other node holds this lock and + * {@code false} otherwise + * @throws IgniteException if the lock is not initialized or already removed + */ + public boolean isLocked() throws IgniteException; + + /** + * Queries whether any threads on this node are waiting to acquire this lock. Note that + * because cancellations may occur at any time, a {@code true} + * return does not guarantee that any other thread will ever + * acquire this lock. This method is designed primarily for use in + * monitoring of the system state. + * + * @return {@code true} if there may be other threads on this node waiting to + * acquire the lock + * @throws IgniteException if the lock is not initialized or already removed + */ + public boolean hasQueuedThreads() throws IgniteException; + + /** + * Queries whether the given thread is waiting to acquire this + * lock. Note that because cancellations may occur at any time, a + * {@code true} return does not guarantee that this thread + * will ever acquire this lock. This method is designed primarily for use + * in monitoring of the system state. + * + * @param thread the thread + * @return {@code true} if the given thread is queued waiting for this lock + * @throws NullPointerException if the thread is null + * @throws IgniteException if the lock is not initialized or already removed + */ + public boolean hasQueuedThread(Thread thread) throws IgniteException; + + /** + * Queries whether any threads on this node are waiting on the given condition + * associated with this lock. Note that because timeouts and + * interrupts may occur at any time, a {@code true} return does + * not guarantee that a future {@code signal} will awaken any + * threads. This method is designed primarily for use in + * monitoring of the system state. + * + * @param condition the condition + * @return {@code true} if there are any waiting threads on this node + * @throws IllegalMonitorStateException if this lock is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this lock + * @throws NullPointerException if the condition is null + * @throws IgniteException if the lock is not initialized or already removed + */ + public boolean hasWaiters(IgniteCondition condition) throws IgniteException; + + /** + * Returns an estimate of the number of threads on this node that are waiting on the + * given condition associated with this lock. Note that because + * timeouts and interrupts may occur at any time, the estimate + * serves only as an upper bound on the actual number of waiters. + * This method is designed for use in monitoring of the system + * state, not for synchronization control. + * + * @param condition the condition + * @return the estimated number of waiting threads on this node + * @throws IllegalMonitorStateException if this lock is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this lock + * @throws NullPointerException if the condition is null + * @throws IgniteException if the lock is not initialized or already removed + */ + public int getWaitQueueLength(IgniteCondition condition) throws IgniteException; + + /** + * Returns {@code true} if this lock is safe to use after node failure. + * If not, IgniteInterruptedException is thrown on every other node after node failure. + * + * @return {@code true} if this reentrant lock has failoverSafe set true + * @throws IgniteException if the lock is not initialized or already removed + */ + public boolean isFailoverSafe(); + + /** + * Returns {@code true} if this lock is fair. Fairness flag can only be set on lock creation. + * + * @return {@code true} if this reentrant lock has fairness flag set true. + * @throws IgniteException if the lock is not initialized or already removed + */ + public boolean isFair(); + + /** + * Returns true if any node that owned the locked failed before releasing the lock. + * + * @return true if any node failed while owning the lock since the lock on this node was initialized. + * @throws IgniteException if the lock is not initialized or already removed + */ + public boolean isBroken() throws IgniteException; + + /** + * Gets status of reentrant lock. + * + * @return {@code true} if reentrant lock was removed from cache, {@code false} in other case. + */ + public boolean removed(); + + /** + * Removes reentrant lock. + * + * @throws IgniteException If operation failed. + */ + @Override public void close(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 79d67df..1ff4543 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -876,7 +876,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - stash.set(U.readString(in)); + U.readString(in); // Read for compatibility only. See #readResolve(). } /** @@ -887,7 +887,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable */ protected Object readResolve() throws ObjectStreamException { try { - return IgnitionEx.gridx(stash.get()).context(); + return IgnitionEx.localIgnite().context(); } catch (IllegalStateException e) { throw U.withCause(new InvalidObjectException(e.getMessage()), e); http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 5094415..0f180b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -64,6 +64,7 @@ import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteScheduler; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteServices; @@ -3036,6 +3037,26 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ + @Nullable @Override public IgniteLock reentrantLock( + String name, + boolean failoverSafe, + boolean fair, + boolean create + ) { + guard(); + + try { + return ctx.dataStructures().reentrantLock(name, failoverSafe, fair, create); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Nullable @Override public <T> IgniteQueue<T> queue(String name, int cap, CollectionConfiguration cfg) http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 10aa71e..1cad22f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -43,6 +43,7 @@ import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteQueue; import org.apache.ignite.IgniteSemaphore; @@ -95,6 +96,7 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE; +import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.REENTRANT_LOCK; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -135,6 +137,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** Cache contains only {@code GridCacheSemaphoreState}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView; + /** Cache contains only {@code GridCacheLockState}. */ + private IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> reentrantLockView; + /** Cache contains only {@code GridCacheAtomicReferenceValue}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView; @@ -177,7 +182,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { ctx.event().addLocalEventListener( new GridLocalEventListener() { @Override public void onEvent(final Event evt) { - // This may require cache operation to exectue, + // This may require cache operation to execute, // therefore cannot use event notification thread. ctx.closure().callLocalSafe( new Callable<Object>() { @@ -189,6 +194,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { for (GridCacheRemovable ds : dsMap.values()) { if (ds instanceof GridCacheSemaphoreEx) ((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId); + else if (ds instanceof GridCacheLockEx) + ((GridCacheLockEx)ds).onNodeRemoved(leftNodeId); } return null; @@ -224,6 +231,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { semView = atomicsCache; + reentrantLockView = atomicsCache; + atomicLongView = atomicsCache; atomicRefView = atomicsCache; @@ -262,6 +271,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { for (GridCacheRemovable ds : dsMap.values()) { if (ds instanceof GridCacheSemaphoreEx) ((GridCacheSemaphoreEx)ds).stop(); + + if (ds instanceof GridCacheLockEx) + ((GridCacheLockEx)ds).onStop(); } if (initLatch.getCount() > 0) { @@ -1332,6 +1344,124 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** + * Gets or creates reentrant lock. If reentrant lock is not found in cache, + * it is created using provided name, failover mode, and fairness mode parameters. + * + * @param name Name of the reentrant lock. + * @param failoverSafe Flag indicating behaviour in case of failure. + * @param fair Flag indicating fairness policy of this lock. + * @param create If {@code true} reentrant lock will be created in case it is not in cache. + * @return ReentrantLock for the given name or {@code null} if it is not found and + * {@code create} is false. + * @throws IgniteCheckedException If operation failed. + */ + public IgniteLock reentrantLock(final String name, final boolean failoverSafe, final boolean fair, final boolean create) + throws IgniteCheckedException { + A.notNull(name, "name"); + + awaitInitialization(); + + checkAtomicsConfiguration(); + + startQuery(); + + return getAtomic(new IgniteOutClosureX<IgniteLock>() { + @Override public IgniteLock applyx() throws IgniteCheckedException { + GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + + dsCacheCtx.gate().enter(); + + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class); + + // Check that reentrant lock hasn't been created in other thread yet. + GridCacheLockEx reentrantLock = cast(dsMap.get(key), GridCacheLockEx.class); + + if (reentrantLock != null) { + assert val != null; + + return reentrantLock; + } + + if (val == null && !create) + return null; + + if (val == null) { + val = new GridCacheLockState(0, dsCacheCtx.nodeId(), 0, failoverSafe, fair); + + dsView.put(key, val); + } + + GridCacheLockEx reentrantLock0 = new GridCacheLockImpl( + name, + key, + reentrantLockView, + dsCacheCtx); + + dsMap.put(key, reentrantLock0); + + tx.commit(); + + return reentrantLock0; + } + catch (Error | Exception e) { + dsMap.remove(key); + + U.error(log, "Failed to create reentrant lock: " + name, e); + + throw e; + } + finally { + dsCacheCtx.gate().leave(); + } + } + }, new DataStructureInfo(name, REENTRANT_LOCK, null), create, GridCacheLockEx.class); + } + + /** + * Removes reentrant lock from cache. + * + * @param name Name of the reentrant lock. + * @param broken Flag indicating the reentrant lock is broken and should be removed unconditionally. + * @throws IgniteCheckedException If operation failed. + */ + public void removeReentrantLock(final String name, final boolean broken) throws IgniteCheckedException { + assert name != null; + assert dsCacheCtx != null; + + awaitInitialization(); + + removeDataStructure(new IgniteOutClosureX<Void>() { + @Override public Void applyx() throws IgniteCheckedException { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); + + dsCacheCtx.gate().enter(); + + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + // Check correctness type of removable object. + GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class); + + if (val != null) { + if (val.get() > 0 && !broken) + throw new IgniteCheckedException("Failed to remove reentrant lock with blocked threads. "); + + dsView.remove(key); + + tx.commit(); + } + else + tx.setRollbackOnly(); + + return null; + } + finally { + dsCacheCtx.gate().leave(); + } + } + }, name, REENTRANT_LOCK, null); + } + + /** * Remove internal entry by key from cache. * * @param key Internal entry key. @@ -1379,7 +1509,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) return evt.getValue() instanceof GridCacheCountDownLatchValue || - evt.getValue() instanceof GridCacheSemaphoreState; + evt.getValue() instanceof GridCacheSemaphoreState || + evt.getValue() instanceof GridCacheLockState; else { assert evt.getEventType() == EventType.REMOVED : evt; @@ -1476,7 +1607,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { ", actual=" + sem.getClass() + ", value=" + sem + ']'); } } + else if (val0 instanceof GridCacheLockState) { + GridCacheInternalKey key = evt.getKey(); + + // Notify reentrant lock on changes. + final GridCacheRemovable reentrantLock = dsMap.get(key); + + GridCacheLockState val = (GridCacheLockState)val0; + + if (reentrantLock instanceof GridCacheLockEx) { + final GridCacheLockEx lock0 = (GridCacheLockEx)reentrantLock; + lock0.onUpdate(val); + } + else if (reentrantLock != null) { + U.error(log, "Failed to cast object " + + "[expected=" + IgniteLock.class.getSimpleName() + + ", actual=" + reentrantLock.getClass() + ", value=" + reentrantLock + ']'); + } + } } else { assert evt.getEventType() == EventType.REMOVED : evt; @@ -1694,7 +1843,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { SET(IgniteSet.class.getSimpleName()), /** */ - SEMAPHORE(IgniteSemaphore.class.getSimpleName()); + SEMAPHORE(IgniteSemaphore.class.getSimpleName()), + + /** */ + REENTRANT_LOCK(IgniteLock.class.getSimpleName()); /** */ private static final DataStructureType[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java new file mode 100644 index 0000000..0887345 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.util.UUID; +import org.apache.ignite.IgniteLock; + +/** + * Grid cache reentrant lock ({@code 'Ex'} stands for external). + */ +public interface GridCacheLockEx extends IgniteLock, GridCacheRemovable { + /** + * Get current reentrant lock latch key. + * + * @return Lock key. + */ + public GridCacheInternalKey key(); + + /** + * Callback to notify reentrant lock on changes. + * + * @param state New reentrant lock state. + */ + public void onUpdate(GridCacheLockState state); + + /** + * Callback to notify semaphore on topology changes. + * + * @param nodeId Id of the node that left the grid. + */ + public void onNodeRemoved(UUID nodeId); + + /** + * Callback to notify local reentrant lock instance on node stop. + */ + public void onStop(); +}
