ignite-642: Implemented.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b4e49bde Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b4e49bde Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b4e49bde Branch: refs/heads/ignite-642 Commit: b4e49bde29d6b5feb8d3b4f6b0d157c786c9ba98 Parents: 46b6a76 Author: Vladisav Jelisavcic <[email protected]> Authored: Fri Feb 26 16:11:16 2016 +0100 Committer: vladisav <[email protected]> Committed: Fri Feb 26 16:11:16 2016 +0100 ---------------------------------------------------------------------- .../IgniteReentrantLockExample.java | 294 +++++ .../ignite/examples/CacheExamplesSelfTest.java | 8 + .../src/main/java/org/apache/ignite/Ignite.java | 16 + .../java/org/apache/ignite/IgniteCondition.java | 339 ++++++ .../org/apache/ignite/IgniteReentrantLock.java | 434 +++++++ .../apache/ignite/internal/IgniteKernal.java | 20 + .../datastructures/DataStructuresProcessor.java | 159 ++- .../GridCacheReentrantLockEx.java | 47 + .../GridCacheReentrantLockImpl.java | 1150 ++++++++++++++++++ .../GridCacheReentrantLockState.java | 298 +++++ .../resources/META-INF/classnames.properties | 2 + .../IgniteClientReconnectAtomicsTest.java | 54 + ...eAbstractDataStructuresFailoverSelfTest.java | 85 +- .../IgniteClientDataStructuresAbstractTest.java | 70 ++ .../IgniteDataStructureUniqueNameTest.java | 13 +- .../IgniteReentrantLockAbstractSelfTest.java | 428 +++++++ .../local/IgniteLocalReentrantLockSelfTest.java | 110 ++ .../IgnitePartitionedReentrantLockSelfTest.java | 33 + .../IgniteReplicatedReentrantLockSelfTest.java | 33 + .../cache/GridCacheDataStructuresLoadTest.java | 53 + .../ignite/testframework/junits/IgniteMock.java | 9 + .../junits/multijvm/IgniteProcessProxy.java | 7 + .../org/apache/ignite/IgniteSpringBean.java | 10 + 23 files changed, 3663 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java new file mode 100644 index 0000000..c87ce49 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datastructures; + +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteReentrantLock; +import org.apache.ignite.Ignition; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.lang.IgniteRunnable; + +/** + * This example demonstrates cache based reentrant lock. + * <p> + * Remote nodes should always be started with special configuration + * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code + * examples/config/example-ignite.xml} configuration. + */ +public class IgniteReentrantLockExample { + /** Number of items for each producer/consumer to produce/consume. */ + private static final int OPS_COUNT = 100; + + /** Number of producers. */ + private static final int NUM_PRODUCERS = 5; + + /** Number of consumers. */ + private static final int NUM_CONSUMERS = 5; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Name of the global resource. */ + private static final String QUEUE_ID = "queue"; + + /** Name of the synchronization variable. */ + private static final String SYNC_NAME = "done"; + + /** Name of the condition object. */ + private static final String NOT_FULL = "notFull"; + + /** Name of the condition object. */ + private static final String NOT_EMPTY = "notEmpty"; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + */ + public static void main(String[] args) { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Cache atomic reentrant lock example started."); + + // Make name of reentrant lock. + final String reentrantLockName = UUID.randomUUID().toString(); + + // Initialize lock. + IgniteReentrantLock lock = ignite.reentrantLock(reentrantLockName, true, true); + + // Init distributed cache. + IgniteCache<String, Integer> cache = ignite.getOrCreateCache(CACHE_NAME); + + // Init shared variable. + cache.put(QUEUE_ID, 0); + + // Shared variable indicating number of jobs left to be completed. + cache.put(SYNC_NAME, NUM_PRODUCERS + NUM_CONSUMERS); + + // Start consumers on all cluster nodes. + for (int i = 0; i < NUM_CONSUMERS; i++) + ignite.compute().withAsync().run(new Consumer(reentrantLockName)); + + // Start producers on all cluster nodes. + for (int i = 0; i < NUM_PRODUCERS; i++) + ignite.compute().withAsync().run(new Producer(reentrantLockName)); + + System.out.println("Master node is waiting for all other nodes to finish..."); + + // Wait for everyone to finish. + try { + lock.lock(); + + IgniteCondition notDone = lock.newCondition(SYNC_NAME); + + int count = cache.get(SYNC_NAME); + + while(count > 0) { + notDone.await(); + + count = cache.get(SYNC_NAME); + } + } + finally { + lock.unlock(); + + } + } + + System.out.flush(); + System.out.println(); + System.out.println("Finished reentrant lock example..."); + System.out.println("Check all nodes for output (this node is also part of the cluster)."); + } + + /** + * Closure which simply acquires reentrant lock. + */ + private abstract static class ReentrantLockExampleClosure implements IgniteRunnable { + /** Semaphore name. */ + protected final String reentrantLockName; + + /** + * @param reentrantLockName Reentrant lock name. + */ + ReentrantLockExampleClosure(String reentrantLockName) { + this.reentrantLockName = reentrantLockName; + } + + } + + /** + * Closure which simulates producer. + */ + private static class Producer extends ReentrantLockExampleClosure { + /** + * @param reentrantLockName Reentrant lock name. + */ + public Producer(String reentrantLockName) { + super(reentrantLockName); + } + + /** {@inheritDoc} */ + @Override public void run() { + System.out.println("Producer started. "); + + IgniteReentrantLock lock = Ignition.ignite().reentrantLock(reentrantLockName, true, true); + + // Condition to wait on when queue is full. + IgniteCondition notFull = lock.newCondition(NOT_FULL); + + // Signaled to wait on when queue is empty. + IgniteCondition notEmpty = lock.newCondition(NOT_EMPTY); + + // Signaled when job is done. + IgniteCondition done = lock.newCondition(SYNC_NAME); + + IgniteCache<String, Integer> cache = Ignition.ignite().cache(CACHE_NAME); + + for (int i = 0; i < OPS_COUNT; i++) { + try { + lock.lock(); + + int val = cache.get(QUEUE_ID); + + while(val >= 100){ + System.out.println("Queue is full. Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + " paused."); + + notFull.await(); + + val = cache.get(QUEUE_ID); + } + + val++; + + System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + ", available=" + val + ']'); + + cache.put(QUEUE_ID, val); + + notEmpty.signalAll(); + } + finally { + lock.unlock(); + + } + } + + System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); + + try { + lock.lock(); + + int count = cache.get(SYNC_NAME); + + count--; + + cache.put(SYNC_NAME, count); + + // Signals the master thread. + done.signal(); + }finally { + lock.unlock(); + } + } + } + + /** + * Closure which simulates consumer. + */ + private static class Consumer extends ReentrantLockExampleClosure { + /** + * @param reentrantLockName ReentrantLock name. + */ + public Consumer(String reentrantLockName) { + super(reentrantLockName); + } + + /** {@inheritDoc} */ + @Override public void run() { + System.out.println("Consumer started. "); + + Ignite g = Ignition.ignite(); + + IgniteReentrantLock lock = g.reentrantLock(reentrantLockName, true, true); + + // Condition to wait on when queue is full. + IgniteCondition notFull = lock.newCondition(NOT_FULL); + + // Signaled to wait on when queue is empty. + IgniteCondition notEmpty = lock.newCondition(NOT_EMPTY); + + // Signaled when job is done. + IgniteCondition done = lock.newCondition(SYNC_NAME); + + IgniteCache<String, Integer> cache = g.cache(CACHE_NAME); + + for (int i = 0; i < OPS_COUNT; i++) { + try { + lock.lock(); + + int val = cache.get(QUEUE_ID); + + while (val <= 0) { + System.out.println("Queue is empty. Consumer [nodeId=" + + Ignition.ignite().cluster().localNode().id() + " paused."); + + notEmpty.await(); + + val = cache.get(QUEUE_ID); + } + + val--; + + System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + ", available=" + val + ']'); + + cache.put(QUEUE_ID, val); + + notFull.signalAll(); + } + finally { + lock.unlock(); + } + } + + System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); + + try { + lock.lock(); + + int count = cache.get(SYNC_NAME); + + count--; + + cache.put(SYNC_NAME, count); + + // Signals the master thread. + done.signal(); + }finally { + lock.unlock(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java index 39c2ea6..fea3627 100644 --- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java +++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.examples.datastructures.IgniteAtomicSequenceExample; import org.apache.ignite.examples.datastructures.IgniteAtomicStampedExample; import org.apache.ignite.examples.datastructures.IgniteCountDownLatchExample; import org.apache.ignite.examples.datastructures.IgniteQueueExample; +import org.apache.ignite.examples.datastructures.IgniteReentrantLockExample; import org.apache.ignite.examples.datastructures.IgniteSemaphoreExample; import org.apache.ignite.examples.datastructures.IgniteSetExample; import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest; @@ -99,6 +100,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest { /** * @throws Exception If failed. */ + public void testCacheReentrantLockExample() throws Exception { + IgniteReentrantLockExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ public void testCacheQueueExample() throws Exception { IgniteQueueExample.main(EMPTY_ARGS); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 5703744..1838fd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -466,6 +466,22 @@ public interface Ignite extends AutoCloseable { throws IgniteException; /** + * Gets or creates reentrant lock. If reentrant lock is not found in cache and {@code create} flag + * is {@code true}, it is created using provided name. + * + * @param name Name of the lock. + * @param failoverSafe {@code True} to create failover safe lock which means that + * if any node leaves topology lock already acquired by that node is silently released + * and become available for alive nodes to acquire. If flag is {@code false} then + * all threads waiting to acquire lock get interrupted. + * @param create Boolean flag indicating whether data structure should be created if does not exist. + * @return ReentrantLock for the given name. + * @throws IgniteException If reentrant lock could not be fetched or created. + */ + public IgniteReentrantLock reentrantLock(String name, boolean failoverSafe, boolean create) + throws IgniteException; + + /** * Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not * {@code null}. * If queue is present already, queue properties will not be changed. Use http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java new file mode 100644 index 0000000..e79270d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + + +public interface IgniteCondition { + + /** + * Name of ignite condition. + * + * @return Name of ignite condition. + */ + public String name(); + + /** + * Causes the current thread to wait until it is signalled or + * {@linkplain Thread#interrupt interrupted}. + * + * <p>The lock associated with this {@code IgniteCondition} is atomically + * released and the current thread becomes disabled for thread scheduling + * purposes and lies dormant until <em>one</em> of four things happens: + * <ul> + * <li>Some other thread invokes the {@link #signal} method for this + * {@code Condition} and the current thread happens to be chosen as the + * thread to be awakened; or + * <li>Some other thread invokes the {@link #signalAll} method for this + * {@code Condition}; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread, and interruption of thread suspension is supported; or + * <li>A "<em>spurious wakeup</em>" occurs. + * </ul> + * + * <p>In all cases, before this method can return the current thread must + * re-acquire the lock associated with this condition. When the + * thread returns it is <em>guaranteed</em> to hold this lock. + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting + * and interruption of thread suspension is supported, + * </ul> + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. It is not specified, in the first + * case, whether or not the test for interruption occurs before the lock + * is released. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. + * It is up to the implementation to determine if this is + * the case and if not, how to respond. Typically, an exception will be + * thrown (such as {@link IllegalMonitorStateException}) and the + * implementation must document that fact. + * + * <p>An implementation can favor responding to an interrupt over normal + * method return in response to a signal. In that case the implementation + * must ensure that the signal is redirected to another waiting thread, if + * there is one. + * + * @throws IgniteInterruptedException if the current thread is interrupted + * (and interruption of thread suspension is supported) + */ + void await() throws IgniteInterruptedException; + + /** + * Causes the current thread to wait until it is signalled. + * + * <p>The lock associated with this condition is atomically + * released and the current thread becomes disabled for thread scheduling + * purposes and lies dormant until <em>one</em> of three things happens: + * <ul> + * <li>Some other thread invokes the {@link #signal} method for this + * {@code Condition} and the current thread happens to be chosen as the + * thread to be awakened; or + * <li>Some other thread invokes the {@link #signalAll} method for this + * {@code Condition}; or + * <li>A "<em>spurious wakeup</em>" occurs. + * </ul> + * + * <p>In all cases, before this method can return the current thread must + * re-acquire the lock associated with this condition. When the + * thread returns it is <em>guaranteed</em> to hold this lock. + * + * <p>If the current thread's interrupted status is set when it enters + * this method, or it is {@linkplain Thread#interrupt interrupted} + * while waiting, it will continue to wait until signalled. When it finally + * returns from this method its interrupted status will still + * be set. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. + * It is up to the implementation to determine if this is + * the case and if not, how to respond. Typically, an exception will be + * thrown (such as {@link IllegalMonitorStateException}) and the + * implementation must document that fact. + */ + void awaitUninterruptibly(); + + /** + * Causes the current thread to wait until it is signalled or interrupted, + * or the specified waiting time elapses. + * + * <p>The lock associated with this condition is atomically + * released and the current thread becomes disabled for thread scheduling + * purposes and lies dormant until <em>one</em> of five things happens: + * <ul> + * <li>Some other thread invokes the {@link #signal} method for this + * {@code Condition} and the current thread happens to be chosen as the + * thread to be awakened; or + * <li>Some other thread invokes the {@link #signalAll} method for this + * {@code Condition}; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread, and interruption of thread suspension is supported; or + * <li>The specified waiting time elapses; or + * <li>A "<em>spurious wakeup</em>" occurs. + * </ul> + * + * <p>In all cases, before this method can return the current thread must + * re-acquire the lock associated with this condition. When the + * thread returns it is <em>guaranteed</em> to hold this lock. + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting + * and interruption of thread suspension is supported, + * </ul> + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. It is not specified, in the first + * case, whether or not the test for interruption occurs before the lock + * is released. + * + * <p>The method returns an estimate of the number of nanoseconds + * remaining to wait given the supplied {@code nanosTimeout} + * value upon return, or a value less than or equal to zero if it + * timed out. This value can be used to determine whether and how + * long to re-wait in cases where the wait returns but an awaited + * condition still does not hold. Typical uses of this method take + * the following form: + * + * <pre> {@code + * boolean aMethod(long timeout, TimeUnit unit) { + * long nanos = unit.toNanos(timeout); + * lock.lock(); + * try { + * while (!conditionBeingWaitedFor()) { + * if (nanos <= 0L) + * return false; + * nanos = theCondition.awaitNanos(nanos); + * } + * // ... + * } finally { + * lock.unlock(); + * } + * }}</pre> + * + * <p>Design note: This method requires a nanosecond argument so + * as to avoid truncation errors in reporting remaining times. + * Such precision loss would make it difficult for programmers to + * ensure that total waiting times are not systematically shorter + * than specified when re-waits occur. + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. + * It is up to the implementation to determine if this is + * the case and if not, how to respond. Typically, an exception will be + * thrown (such as {@link IllegalMonitorStateException}) and the + * implementation must document that fact. + * + * <p>An implementation can favor responding to an interrupt over normal + * method return in response to a signal, or over indicating the elapse + * of the specified waiting time. In either case the implementation + * must ensure that the signal is redirected to another waiting thread, if + * there is one. + * + * @param nanosTimeout the maximum time to wait, in nanoseconds + * @return an estimate of the {@code nanosTimeout} value minus + * the time spent waiting upon return from this method. + * A positive value may be used as the argument to a + * subsequent call to this method to finish waiting out + * the desired time. A value less than or equal to zero + * indicates that no time remains. + * @throws IgniteInterruptedException if the current thread is interrupted + * (and interruption of thread suspension is supported) + */ + long awaitNanos(long nanosTimeout) throws IgniteInterruptedException; + + /** + * Causes the current thread to wait until it is signalled or interrupted, + * or the specified waiting time elapses. This method is behaviorally + * equivalent to: + * <pre> {@code awaitNanos(unit.toNanos(time)) > 0}</pre> + * + * @param time the maximum time to wait + * @param unit the time unit of the {@code time} argument + * @return {@code false} if the waiting time detectably elapsed + * before return from the method, else {@code true} + * @throws IgniteInterruptedException if the current thread is interrupted + * (and interruption of thread suspension is supported) + */ + boolean await(long time, TimeUnit unit) throws IgniteInterruptedException; + + /** + * Causes the current thread to wait until it is signalled or interrupted, + * or the specified deadline elapses. + * + * <p>The lock associated with this condition is atomically + * released and the current thread becomes disabled for thread scheduling + * purposes and lies dormant until <em>one</em> of five things happens: + * <ul> + * <li>Some other thread invokes the {@link #signal} method for this + * {@code Condition} and the current thread happens to be chosen as the + * thread to be awakened; or + * <li>Some other thread invokes the {@link #signalAll} method for this + * {@code Condition}; or + * <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread, and interruption of thread suspension is supported; or + * <li>The specified deadline elapses; or + * <li>A "<em>spurious wakeup</em>" occurs. + * </ul> + * + * <p>In all cases, before this method can return the current thread must + * re-acquire the lock associated with this condition. When the + * thread returns it is <em>guaranteed</em> to hold this lock. + * + * + * <p>If the current thread: + * <ul> + * <li>has its interrupted status set on entry to this method; or + * <li>is {@linkplain Thread#interrupt interrupted} while waiting + * and interruption of thread suspension is supported, + * </ul> + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. It is not specified, in the first + * case, whether or not the test for interruption occurs before the lock + * is released. + * + * + * <p>The return value indicates whether the deadline has elapsed, + * which can be used as follows: + * <pre> {@code + * boolean aMethod(Date deadline) { + * boolean stillWaiting = true; + * lock.lock(); + * try { + * while (!conditionBeingWaitedFor()) { + * if (!stillWaiting) + * return false; + * stillWaiting = theCondition.awaitUntil(deadline); + * } + * // ... + * } finally { + * lock.unlock(); + * } + * }}</pre> + * + * <p><b>Implementation Considerations</b> + * + * <p>The current thread is assumed to hold the lock associated with this + * {@code Condition} when this method is called. + * It is up to the implementation to determine if this is + * the case and if not, how to respond. Typically, an exception will be + * thrown (such as {@link IllegalMonitorStateException}) and the + * implementation must document that fact. + * + * <p>An implementation can favor responding to an interrupt over normal + * method return in response to a signal, or over indicating the passing + * of the specified deadline. In either case the implementation + * must ensure that the signal is redirected to another waiting thread, if + * there is one. + * + * @param deadline the absolute time to wait until + * @return {@code false} if the deadline has elapsed upon return, else + * {@code true} + * @throws IgniteInterruptedException if the current thread is interrupted + * (and interruption of thread suspension is supported) + */ + boolean awaitUntil(Date deadline) throws IgniteInterruptedException; + + /** + * Wakes up one waiting thread. + * + * <p>If any threads are waiting on this condition then one + * is selected for waking up. That thread must then re-acquire the + * lock before returning from {@code await}. + * + * <p><b>Implementation Considerations</b> + * + * <p>An implementation may (and typically does) require that the + * current thread hold the lock associated with this {@code + * Condition} when this method is called. Implementations must + * document this precondition and any actions taken if the lock is + * not held. Typically, an exception such as {@link + * IllegalMonitorStateException} will be thrown. + */ + void signal(); + + /** + * Wakes up all waiting threads. + * + * <p>If any threads are waiting on this condition then they are + * all woken up. Each thread must re-acquire the lock before it can + * return from {@code await}. + * + * <p><b>Implementation Considerations</b> + * + * <p>An implementation may (and typically does) require that the + * current thread hold the lock associated with this {@code + * Condition} when this method is called. Implementations must + * document this precondition and any actions taken if the lock is + * not held. Typically, an exception such as {@link + * IllegalMonitorStateException} will be thrown. + */ + void signalAll(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java b/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java new file mode 100644 index 0000000..d209b51 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import java.io.Closeable; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * This interface provides a rich API for working with distributed reentrant lock. + * <p> + * <h1 class="header">Functionality</h1> + * Distributed reentrant lock provides functionality similar to {@code java.util.concurrent.ReentrantLock}. + * <h1 class="header">Creating Distributed ReentrantLock</h1> + * Instance of cache reentrant lock can be created by calling the following method: + * {@link Ignite#reentrantLock(String, boolean, boolean)}. + */ +public interface IgniteReentrantLock extends Closeable { + + /** + * Name of atomic reentrant lock. + * + * @return Name of atomic reentrant lock. + */ + public String name(); + + /** + * Acquires the lock. + * + * <p>Acquires the lock if it is not held by another thread and returns + * immediately, setting the lock hold count to one. + * + * <p>If the current thread already holds the lock then the hold + * count is incremented by one and the method returns immediately. + * + * <p>If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until the lock has been acquired, + * at which time the lock hold count is set to one. + */ + public void lock(); + + /** + * Acquires the lock unless the current thread is + * {@linkplain Thread#interrupt interrupted}. + * + * <p>Acquires the lock if it is not held by another thread and returns + * immediately, setting the lock hold count to one. + * + * <p>If the current thread already holds this lock then the hold count + * is incremented by one and the method returns immediately. + * + * <p>If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of two things happens: + * + * <ul> + * + * <li>The lock is acquired by the current thread; or + * + * <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread. + * + * </ul> + * + * <p>If the lock is acquired by the current thread then the lock hold + * count is set to one. + * + * <p>If the current thread: + * + * <ul> + * + * <li>has its interrupted status set on entry to this method; or + * + * <li>is {@linkplain Thread#interrupt interrupted} while acquiring + * the lock, + * + * </ul> + * + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * <p>In this implementation, as this method is an explicit + * interruption point, preference is given to responding to the + * interrupt over normal or reentrant acquisition of the lock. + * + * @throws IgniteInterruptedException if the current thread is interrupted + */ + public void lockInterruptibly() throws IgniteInterruptedException; + + /** + * Acquires the lock only if it is not held by another thread at the time + * of invocation. + * + * <p>Acquires the lock if it is not held by another thread and + * returns immediately with the value {@code true}, setting the + * lock hold count to one. + * + * <p>If the current thread already holds this lock then the hold + * count is incremented by one and the method returns {@code true}. + * + * <p>If the lock is held by another thread then this method will return + * immediately with the value {@code false}. + * + * @return {@code true} if the lock was free and was acquired by the + * current thread, or the lock was already held by the current + * thread; and {@code false} otherwise + */ + public boolean tryLock(); + + /** + * Acquires the lock if it is not held by another thread within the given + * waiting time and the current thread has not been + * {@linkplain Thread#interrupt interrupted}. + * + * <p>Acquires the lock if it is not held by another thread and returns + * immediately with the value {@code true}, setting the lock hold count + * to one. + * + * <p>If the current thread + * already holds this lock then the hold count is incremented by one and + * the method returns {@code true}. + * + * <p>If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of three things happens: + * + * <ul> + * + * <li>The lock is acquired by the current thread; or + * + * <li>Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + * + * <li>The specified waiting time elapses + * + * </ul> + * + * <p>If the lock is acquired then the value {@code true} is returned and + * the lock hold count is set to one. + * + * <p>If the current thread: + * + * <ul> + * + * <li>has its interrupted status set on entry to this method; or + * + * <li>is {@linkplain Thread#interrupt interrupted} while + * acquiring the lock, + * + * </ul> + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * <p>If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * <p>In this implementation, as this method is an explicit + * interruption point, preference is given to responding to the + * interrupt over normal or reentrant acquisition of the lock, and + * over reporting the elapse of the waiting time. + * + * @param timeout the time to wait for the lock + * @param unit the time unit of the timeout argument + * @return {@code true} if the lock was free and was acquired by the + * current thread, or the lock was already held by the current + * thread; and {@code false} if the waiting time elapsed before + * the lock could be acquired + * @throws IgniteInterruptedException if the current thread is interrupted + * @throws NullPointerException if the time unit is null + */ + public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException; + + /** + * Attempts to release this lock. + * + * <p>If the current thread is the holder of this lock then the hold + * count is decremented. If the hold count is now zero then the lock + * is released. If the current thread is not the holder of this + * lock then {@link IllegalMonitorStateException} is thrown. + * + * @throws IllegalMonitorStateException if the current thread does not + * hold this lock + */ + public void unlock(); + + /** + * Returns a {@link Condition} instance for use with this + * {@link Lock} instance. + * + * <p>The returned {@link Condition} instance supports the same + * usages as do the {@link Object} monitor methods ({@link + * Object#wait() wait}, {@link Object#notify notify}, and {@link + * Object#notifyAll notifyAll}) when used with the built-in + * monitor lock. + * + * <ul> + * + * <li>If this lock is not held when any of the {@link Condition} + * {@linkplain Condition#await() waiting} or {@linkplain + * Condition#signal signalling} methods are called, then an {@link + * IllegalMonitorStateException} is thrown. + * + * <li>When the condition {@linkplain Condition#await() waiting} + * methods are called the lock is released and, before they + * return, the lock is reacquired and the lock hold count restored + * to what it was when the method was called. + * + * <li>If a thread is {@linkplain Thread#interrupt interrupted} + * while waiting then the wait will terminate, an {@link + * IgniteInterruptedException} will be thrown, and the thread's + * interrupted status will be cleared. + * + * <li> Waiting threads are signalled in FIFO order. + * + * </ul> + * + * @return the Condition object + */ + public IgniteCondition newCondition(String name); + + /** + * Queries the number of holds on this lock by the current thread. + * + * <p>A thread has a hold on a lock for each lock action that is not + * matched by an unlock action. + * + * <p>The hold count information is typically only used for testing and + * debugging purposes. For example, if a certain section of code should + * not be entered with the lock already held then we can assert that + * fact: + * + * <pre> {@code + * class X { + * ReentrantLock lock = new ReentrantLock(); + * // ... + * public void m() { + * assert lock.getHoldCount() == 0; + * lock.lock(); + * try { + * // ... method body + * } finally { + * lock.unlock(); + * } + * } + * }}</pre> + * + * @return the number of holds on this lock by the current thread, + * or zero if this lock is not held by the current thread + */ + public int getHoldCount(); + + /** + * Queries if this lock is held by the current thread. + * + * <p>Analogous to the {@link Thread#holdsLock(Object)} method for + * built-in monitor locks, this method is typically used for + * debugging and testing. For example, a method that should only be + * called while a lock is held can assert that this is the case: + * + * <pre> {@code + * class X { + * ReentrantLock lock = new ReentrantLock(); + * // ... + * + * public void m() { + * assert lock.isHeldByCurrentThread(); + * // ... method body + * } + * }}</pre> + * + * <p>It can also be used to ensure that a reentrant lock is used + * in a non-reentrant manner, for example: + * + * <pre> {@code + * class X { + * ReentrantLock lock = new ReentrantLock(); + * // ... + * + * public void m() { + * assert !lock.isHeldByCurrentThread(); + * lock.lock(); + * try { + * // ... method body + * } finally { + * lock.unlock(); + * } + * } + * }}</pre> + * + * @return {@code true} if current thread holds this lock and + * {@code false} otherwise + */ + public boolean isHeldByCurrentThread(); + + /** + * Queries if this lock is held by any thread. This method is + * designed for use in monitoring of the system state, + * not for synchronization control. + * + * @return {@code true} if any thread holds this lock and + * {@code false} otherwise + */ + public boolean isLocked(); + + /** + * Queries whether any threads are waiting to acquire this lock. Note that + * because cancellations may occur at any time, a {@code true} + * return does not guarantee that any other thread will ever + * acquire this lock. This method is designed primarily for use in + * monitoring of the system state. + * + * @return {@code true} if there may be other threads waiting to + * acquire the lock + */ + public boolean hasQueuedThreads(); + + /** + * Queries whether the given thread is waiting to acquire this + * lock. Note that because cancellations may occur at any time, a + * {@code true} return does not guarantee that this thread + * will ever acquire this lock. This method is designed primarily for use + * in monitoring of the system state. + * + * @param thread the thread + * @return {@code true} if the given thread is queued waiting for this lock + * @throws NullPointerException if the thread is null + */ + public boolean hasQueuedThread(Thread thread); + + /** + * Returns an estimate of the number of nodes waiting to + * acquire this lock. The value is only an estimate because the number of + * nodes may change dynamically while this method traverses + * internal data structures. This method is designed for use in + * monitoring of the system state, not for synchronization + * control. + * + * @return the estimated number of nodes waiting for this lock + */ + public int getQueueLength(); + + /** + * Queries whether any threads on this node are waiting on the given condition + * associated with this lock. Note that because timeouts and + * interrupts may occur at any time, a {@code true} return does + * not guarantee that a future {@code signal} will awaken any + * threads. This method is designed primarily for use in + * monitoring of the system state. + * + * @param condition the condition + * @return {@code true} if there are any waiting threads on this node + * @throws IllegalMonitorStateException if this lock is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this lock + * @throws NullPointerException if the condition is null + */ + public boolean hasWaiters(IgniteCondition condition); + + /** + * Returns an estimate of the number of threads on this node are waiting on the + * given condition associated with this lock. Note that because + * timeouts and interrupts may occur at any time, the estimate + * serves only as an upper bound on the actual number of waiters. + * This method is designed for use in monitoring of the system + * state, not for synchronization control. + * + * @param condition the condition + * @return the estimated number of waiting threads on this node + * @throws IgniteIllegalStateException if this lock is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this lock + * @throws NullPointerException if the condition is null + */ + public int getWaitQueueLength(IgniteCondition condition); + + /** + * Returns {@code true} if this lock is safe to use after node failure. + * If not, IgniteInterruptedException is thrown on every other node after node failure. + * + * @return {@code true} if this reentrant lock has failoverSafe set true + */ + public boolean isFailoverSafe(); + + /** + * Returns true if any node that owned the locked failed before releasing the lock.. + * + * @return true if any node failed while owning the lock. + */ + public boolean isBroken(); + + /** + * Returns a string identifying this lock, as well as its lock state. + * The state, in brackets, includes either the String {@code "Unlocked"} + * or the String {@code "Locked by"} followed by the + * {@linkplain UUID} of the owning node and {@linkplain Thread#getName name} + * of the owning thread. + * + * @return a string identifying this lock, as well as its lock state + */ + public String toString(); + + /** + * Gets status of reentrant lock. + * + * @return {@code true} if reentrant lock was removed from cache, {@code false} in other case. + */ + public boolean removed(); + + /** + * Removes reentrant lock. + * + * @throws IgniteException If operation failed. + */ + @Override public void close(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 5d8daf6..21dde23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -64,6 +64,7 @@ import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteScheduler; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteServices; @@ -2993,6 +2994,25 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ + @Nullable @Override public IgniteReentrantLock reentrantLock( + String name, + boolean failoverSafe, + boolean create + ) { + guard(); + + try { + return ctx.dataStructures().reentrantLock(name, failoverSafe, create); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Nullable @Override public <T> IgniteQueue<T> queue(String name, int cap, CollectionConfiguration cfg) http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 98848ee..73aaddf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -45,6 +45,7 @@ import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; @@ -95,6 +96,7 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE; +import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.REENTRANT_LOCK; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -135,6 +137,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** Cache contains only {@code GridCacheSemaphoreState}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView; + /** Cache contains only {@code GridCacheReentrantLockState}. */ + private IgniteInternalCache<GridCacheInternalKey, GridCacheReentrantLockState> reentrantLockView; + /** Cache contains only {@code GridCacheAtomicReferenceValue}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView; @@ -177,7 +182,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { ctx.event().addLocalEventListener( new GridLocalEventListener() { @Override public void onEvent(final Event evt) { - // This may require cache operation to exectue, + // This may require cache operation to execute, // therefore cannot use event notification thread. ctx.closure().callLocalSafe( new Callable<Object>() { @@ -189,6 +194,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { for (GridCacheRemovable ds : dsMap.values()) { if (ds instanceof GridCacheSemaphoreEx) ((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId); + else if(ds instanceof GridCacheReentrantLockEx) + ((GridCacheReentrantLockEx)ds).onNodeRemoved(leftNodeId); } return null; @@ -224,6 +231,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { semView = atomicsCache; + reentrantLockView = atomicsCache; + atomicLongView = atomicsCache; atomicRefView = atomicsCache; @@ -1183,12 +1192,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. GridCacheCountDownLatchValue val = - cast(dsView.get(key), GridCacheCountDownLatchValue.class); + cast(dsView.get(key), GridCacheCountDownLatchValue.class); if (val != null) { if (val.get() > 0) { throw new IgniteCheckedException("Failed to remove count down latch " + - "with non-zero count: " + val.get()); + "with non-zero count: " + val.get()); } dsView.remove(key); @@ -1326,6 +1335,123 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** + * Gets or creates reentrant lock. If reentrant lock is not found in cache, + * it is created using provided name and count parameter. + * + * @param name Name of the reentrant lock. + * @param create If {@code true} reentrant lock will be created in case it is not in cache. + * @return ReentrantLock for the given name or {@code null} if it is not found and + * {@code create} is false. + * @throws IgniteCheckedException If operation failed. + */ + public IgniteReentrantLock reentrantLock(final String name, final boolean failoverSafe, final boolean create) + throws IgniteCheckedException { + A.notNull(name, "name"); + + awaitInitialization(); + + checkAtomicsConfiguration(); + + startQuery(); + + return getAtomic(new IgniteOutClosureX<IgniteReentrantLock>() { + @Override public IgniteReentrantLock applyx() throws IgniteCheckedException { + GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + + dsCacheCtx.gate().enter(); + + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheReentrantLockState val = cast(dsView.get(key), GridCacheReentrantLockState.class); + + // Check that reentrant lock hasn't been created in other thread yet. + GridCacheReentrantLockEx reentrantLock = cast(dsMap.get(key), GridCacheReentrantLockEx.class); + + if (reentrantLock != null) { + assert val != null; + + return reentrantLock; + } + + if (val == null && !create) + return null; + + if (val == null) { + val = new GridCacheReentrantLockState(0, dsCacheCtx.nodeId(), 0, failoverSafe); + + dsView.put(key, val); + } + + GridCacheReentrantLockEx reentrantLock0 = new GridCacheReentrantLockImpl( + name, + key, + reentrantLockView, + dsCacheCtx); + + dsMap.put(key, reentrantLock0); + + tx.commit(); + + return reentrantLock0; + } + catch (Error | Exception e) { + dsMap.remove(key); + + U.error(log, "Failed to create reentrant lock: " + name, e); + + throw e; + } + finally { + dsCacheCtx.gate().leave(); + } + } + }, new DataStructureInfo(name, REENTRANT_LOCK, null), create, GridCacheReentrantLockEx.class); + } + + /** + * Removes reentrant lock from cache. + * + * @param name Name of the reentrant lock. + * @param broken Flag indicating the reentrant lock is broken and should be removed unconditionally. + * @throws IgniteCheckedException If operation failed. + */ + public void removeReentrantLock(final String name, boolean broken) throws IgniteCheckedException { + assert name != null; + assert dsCacheCtx != null; + + awaitInitialization(); + + removeDataStructure(new IgniteOutClosureX<Void>() { + @Override public Void applyx() throws IgniteCheckedException { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); + + dsCacheCtx.gate().enter(); + + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + // Check correctness type of removable object. + GridCacheReentrantLockState val = cast(dsView.get(key), GridCacheReentrantLockState.class); + + if (val != null) { + if (val.get() > 0 && !broken) + throw new IgniteCheckedException("Failed to remove reentrant lock with blocked threads. "); + + dsView.remove(key); + + tx.commit(); + } + else + tx.setRollbackOnly(); + + return null; + } + finally { + dsCacheCtx.gate().leave(); + } + } + }, name, REENTRANT_LOCK, null); + } + + + /** * Remove internal entry by key from cache. * * @param key Internal entry key. @@ -1373,7 +1499,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) return evt.getValue() instanceof GridCacheCountDownLatchValue || - evt.getValue() instanceof GridCacheSemaphoreState; + evt.getValue() instanceof GridCacheSemaphoreState || + evt.getValue() instanceof GridCacheReentrantLockState; else { assert evt.getEventType() == EventType.REMOVED : evt; @@ -1470,6 +1597,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { ", actual=" + sem.getClass() + ", value=" + sem + ']'); } } + else if (val0 instanceof GridCacheReentrantLockState) { + GridCacheInternalKey key = evt.getKey(); + + // Notify reentrant lock on changes. + final GridCacheRemovable reentrantLock = dsMap.get(key); + + GridCacheReentrantLockState val = (GridCacheReentrantLockState)val0; + + if (reentrantLock instanceof GridCacheReentrantLockEx) { + final GridCacheReentrantLockEx lock0 = (GridCacheReentrantLockEx)reentrantLock; + + lock0.onUpdate(val); + } + else if (reentrantLock != null) { + U.error(log, "Failed to cast object " + + "[expected=" + IgniteReentrantLock.class.getSimpleName() + + ", actual=" + reentrantLock.getClass() + ", value=" + reentrantLock + ']'); + } + } } else { @@ -1688,7 +1834,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { SET(IgniteSet.class.getSimpleName()), /** */ - SEMAPHORE(IgniteSemaphore.class.getSimpleName()); + SEMAPHORE(IgniteSemaphore.class.getSimpleName()), + + /** */ + REENTRANT_LOCK(IgniteReentrantLock.class.getSimpleName()); /** */ private static final DataStructureType[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java new file mode 100644 index 0000000..4a5238b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.util.UUID; +import org.apache.ignite.IgniteReentrantLock; + +/** + * Grid cache reentrant lock ({@code 'Ex'} stands for external). + */ +public interface GridCacheReentrantLockEx extends IgniteReentrantLock, GridCacheRemovable { + /** + * Get current reentrant lock latch key. + * + * @return Lock key. + */ + public GridCacheInternalKey key(); + + /** + * Callback to notify reentrant lock on changes. + * + * @param state New reentrant lock state. + */ + public void onUpdate(GridCacheReentrantLockState state); + + /** + * Callback to notify semaphore on topology changes. + * + * @param nodeId Id of the node that left the grid. + */ + public void onNodeRemoved(UUID nodeId); +}
