Ignite-642: fix.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e386558a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e386558a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e386558a Branch: refs/heads/ignite-642 Commit: e386558afc8199ed862f81b9a3a3b9009932049f Parents: b4e49bd Author: Vladisav Jelisavcic <[email protected]> Authored: Sun Mar 13 11:35:53 2016 +0100 Committer: vladisav <[email protected]> Committed: Sun Mar 13 11:35:53 2016 +0100 ---------------------------------------------------------------------- .../datastructures/IgniteLockExample.java | 294 +++++ .../IgniteReentrantLockExample.java | 294 ----- .../ignite/examples/CacheExamplesSelfTest.java | 6 +- .../src/main/java/org/apache/ignite/Ignite.java | 8 +- .../java/org/apache/ignite/IgniteCondition.java | 161 +-- .../main/java/org/apache/ignite/IgniteLock.java | 338 +++++ .../org/apache/ignite/IgniteReentrantLock.java | 434 ------ .../apache/ignite/internal/IgniteKernal.java | 4 +- .../datastructures/DataStructuresProcessor.java | 49 +- .../datastructures/GridCacheLockEx.java | 53 + .../datastructures/GridCacheLockImpl.java | 1244 ++++++++++++++++++ .../datastructures/GridCacheLockState.java | 259 ++++ .../GridCacheReentrantLockEx.java | 47 - .../GridCacheReentrantLockImpl.java | 1150 ---------------- .../GridCacheReentrantLockState.java | 298 ----- .../resources/META-INF/classnames.properties | 4 +- .../IgniteClientReconnectAtomicsTest.java | 8 +- ...eAbstractDataStructuresFailoverSelfTest.java | 6 +- .../IgniteClientDataStructuresAbstractTest.java | 6 +- .../IgniteDataStructureUniqueNameTest.java | 4 +- .../IgniteLockAbstractSelfTest.java | 425 ++++++ .../IgniteReentrantLockAbstractSelfTest.java | 428 ------ .../local/IgniteLocalLockSelfTest.java | 110 ++ .../local/IgniteLocalReentrantLockSelfTest.java | 110 -- .../IgnitePartitionedLockSelfTest.java | 33 + .../IgnitePartitionedReentrantLockSelfTest.java | 33 - .../IgniteReplicatedLockSelfTest.java | 33 + .../IgniteReplicatedReentrantLockSelfTest.java | 33 - .../cache/GridCacheDataStructuresLoadTest.java | 6 +- .../ignite/testframework/junits/IgniteMock.java | 4 +- .../junits/multijvm/IgniteProcessProxy.java | 4 +- .../org/apache/ignite/IgniteSpringBean.java | 2 +- 32 files changed, 2887 insertions(+), 3001 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java new file mode 100644 index 0000000..3ff1f02 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datastructures; + +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteLock; +import org.apache.ignite.Ignition; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.lang.IgniteRunnable; + +/** + * This example demonstrates cache based reentrant lock. + * <p> + * Remote nodes should always be started with special configuration + * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code + * examples/config/example-ignite.xml} configuration. + */ +public class IgniteLockExample { + /** Number of items for each producer/consumer to produce/consume. */ + private static final int OPS_COUNT = 100; + + /** Number of producers. */ + private static final int NUM_PRODUCERS = 5; + + /** Number of consumers. */ + private static final int NUM_CONSUMERS = 5; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Name of the global resource. */ + private static final String QUEUE_ID = "queue"; + + /** Name of the synchronization variable. */ + private static final String SYNC_NAME = "done"; + + /** Name of the condition object. */ + private static final String NOT_FULL = "notFull"; + + /** Name of the condition object. */ + private static final String NOT_EMPTY = "notEmpty"; + + /** + * Executes example. + * + * @param args Command line arguments, none required. + */ + public static void main(String[] args) { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Cache atomic reentrant lock example started."); + + // Make name of reentrant lock. + final String reentrantLockName = UUID.randomUUID().toString(); + + // Initialize lock. + IgniteLock lock = ignite.reentrantLock(reentrantLockName, true, true); + + // Init distributed cache. + IgniteCache<String, Integer> cache = ignite.getOrCreateCache(CACHE_NAME); + + // Init shared variable. + cache.put(QUEUE_ID, 0); + + // Shared variable indicating number of jobs left to be completed. + cache.put(SYNC_NAME, NUM_PRODUCERS + NUM_CONSUMERS); + + // Start consumers on all cluster nodes. + for (int i = 0; i < NUM_CONSUMERS; i++) + ignite.compute().withAsync().run(new Consumer(reentrantLockName)); + + // Start producers on all cluster nodes. + for (int i = 0; i < NUM_PRODUCERS; i++) + ignite.compute().withAsync().run(new Producer(reentrantLockName)); + + System.out.println("Master node is waiting for all other nodes to finish..."); + + // Wait for everyone to finish. + try { + lock.lock(); + + IgniteCondition notDone = lock.getOrCreateCondition(SYNC_NAME); + + int count = cache.get(SYNC_NAME); + + while(count > 0) { + notDone.await(); + + count = cache.get(SYNC_NAME); + } + } + finally { + lock.unlock(); + + } + } + + System.out.flush(); + System.out.println(); + System.out.println("Finished reentrant lock example..."); + System.out.println("Check all nodes for output (this node is also part of the cluster)."); + } + + /** + * Closure which simply acquires reentrant lock. + */ + private abstract static class ReentrantLockExampleClosure implements IgniteRunnable { + /** Semaphore name. */ + protected final String reentrantLockName; + + /** + * @param reentrantLockName Reentrant lock name. + */ + ReentrantLockExampleClosure(String reentrantLockName) { + this.reentrantLockName = reentrantLockName; + } + + } + + /** + * Closure which simulates producer. + */ + private static class Producer extends ReentrantLockExampleClosure { + /** + * @param reentrantLockName Reentrant lock name. + */ + public Producer(String reentrantLockName) { + super(reentrantLockName); + } + + /** {@inheritDoc} */ + @Override public void run() { + System.out.println("Producer started. "); + + IgniteLock lock = Ignition.ignite().reentrantLock(reentrantLockName, true, true); + + // Condition to wait on when queue is full. + IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL); + + // Signaled to wait on when queue is empty. + IgniteCondition notEmpty = lock.getOrCreateCondition(NOT_EMPTY); + + // Signaled when job is done. + IgniteCondition done = lock.getOrCreateCondition(SYNC_NAME); + + IgniteCache<String, Integer> cache = Ignition.ignite().cache(CACHE_NAME); + + for (int i = 0; i < OPS_COUNT; i++) { + try { + lock.lock(); + + int val = cache.get(QUEUE_ID); + + while(val >= 100){ + System.out.println("Queue is full. Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + " paused."); + + notFull.await(); + + val = cache.get(QUEUE_ID); + } + + val++; + + System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + ", available=" + val + ']'); + + cache.put(QUEUE_ID, val); + + notEmpty.signalAll(); + } + finally { + lock.unlock(); + + } + } + + System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); + + try { + lock.lock(); + + int count = cache.get(SYNC_NAME); + + count--; + + cache.put(SYNC_NAME, count); + + // Signals the master thread. + done.signal(); + }finally { + lock.unlock(); + } + } + } + + /** + * Closure which simulates consumer. + */ + private static class Consumer extends ReentrantLockExampleClosure { + /** + * @param reentrantLockName ReentrantLock name. + */ + public Consumer(String reentrantLockName) { + super(reentrantLockName); + } + + /** {@inheritDoc} */ + @Override public void run() { + System.out.println("Consumer started. "); + + Ignite g = Ignition.ignite(); + + IgniteLock lock = g.reentrantLock(reentrantLockName, true, true); + + // Condition to wait on when queue is full. + IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL); + + // Signaled to wait on when queue is empty. + IgniteCondition notEmpty = lock.getOrCreateCondition(NOT_EMPTY); + + // Signaled when job is done. + IgniteCondition done = lock.getOrCreateCondition(SYNC_NAME); + + IgniteCache<String, Integer> cache = g.cache(CACHE_NAME); + + for (int i = 0; i < OPS_COUNT; i++) { + try { + lock.lock(); + + int val = cache.get(QUEUE_ID); + + while (val <= 0) { + System.out.println("Queue is empty. Consumer [nodeId=" + + Ignition.ignite().cluster().localNode().id() + " paused."); + + notEmpty.await(); + + val = cache.get(QUEUE_ID); + } + + val--; + + System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + ", available=" + val + ']'); + + cache.put(QUEUE_ID, val); + + notFull.signalAll(); + } + finally { + lock.unlock(); + } + } + + System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); + + try { + lock.lock(); + + int count = cache.get(SYNC_NAME); + + count--; + + cache.put(SYNC_NAME, count); + + // Signals the master thread. + done.signal(); + }finally { + lock.unlock(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java deleted file mode 100644 index c87ce49..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteReentrantLockExample.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.datastructures; - -import java.util.UUID; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCondition; -import org.apache.ignite.IgniteReentrantLock; -import org.apache.ignite.Ignition; -import org.apache.ignite.examples.ExampleNodeStartup; -import org.apache.ignite.lang.IgniteRunnable; - -/** - * This example demonstrates cache based reentrant lock. - * <p> - * Remote nodes should always be started with special configuration - * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. - * <p> - * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code - * examples/config/example-ignite.xml} configuration. - */ -public class IgniteReentrantLockExample { - /** Number of items for each producer/consumer to produce/consume. */ - private static final int OPS_COUNT = 100; - - /** Number of producers. */ - private static final int NUM_PRODUCERS = 5; - - /** Number of consumers. */ - private static final int NUM_CONSUMERS = 5; - - /** Cache name. */ - private static final String CACHE_NAME = "cache"; - - /** Name of the global resource. */ - private static final String QUEUE_ID = "queue"; - - /** Name of the synchronization variable. */ - private static final String SYNC_NAME = "done"; - - /** Name of the condition object. */ - private static final String NOT_FULL = "notFull"; - - /** Name of the condition object. */ - private static final String NOT_EMPTY = "notEmpty"; - - /** - * Executes example. - * - * @param args Command line arguments, none required. - */ - public static void main(String[] args) { - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - System.out.println(); - System.out.println(">>> Cache atomic reentrant lock example started."); - - // Make name of reentrant lock. - final String reentrantLockName = UUID.randomUUID().toString(); - - // Initialize lock. - IgniteReentrantLock lock = ignite.reentrantLock(reentrantLockName, true, true); - - // Init distributed cache. - IgniteCache<String, Integer> cache = ignite.getOrCreateCache(CACHE_NAME); - - // Init shared variable. - cache.put(QUEUE_ID, 0); - - // Shared variable indicating number of jobs left to be completed. - cache.put(SYNC_NAME, NUM_PRODUCERS + NUM_CONSUMERS); - - // Start consumers on all cluster nodes. - for (int i = 0; i < NUM_CONSUMERS; i++) - ignite.compute().withAsync().run(new Consumer(reentrantLockName)); - - // Start producers on all cluster nodes. - for (int i = 0; i < NUM_PRODUCERS; i++) - ignite.compute().withAsync().run(new Producer(reentrantLockName)); - - System.out.println("Master node is waiting for all other nodes to finish..."); - - // Wait for everyone to finish. - try { - lock.lock(); - - IgniteCondition notDone = lock.newCondition(SYNC_NAME); - - int count = cache.get(SYNC_NAME); - - while(count > 0) { - notDone.await(); - - count = cache.get(SYNC_NAME); - } - } - finally { - lock.unlock(); - - } - } - - System.out.flush(); - System.out.println(); - System.out.println("Finished reentrant lock example..."); - System.out.println("Check all nodes for output (this node is also part of the cluster)."); - } - - /** - * Closure which simply acquires reentrant lock. - */ - private abstract static class ReentrantLockExampleClosure implements IgniteRunnable { - /** Semaphore name. */ - protected final String reentrantLockName; - - /** - * @param reentrantLockName Reentrant lock name. - */ - ReentrantLockExampleClosure(String reentrantLockName) { - this.reentrantLockName = reentrantLockName; - } - - } - - /** - * Closure which simulates producer. - */ - private static class Producer extends ReentrantLockExampleClosure { - /** - * @param reentrantLockName Reentrant lock name. - */ - public Producer(String reentrantLockName) { - super(reentrantLockName); - } - - /** {@inheritDoc} */ - @Override public void run() { - System.out.println("Producer started. "); - - IgniteReentrantLock lock = Ignition.ignite().reentrantLock(reentrantLockName, true, true); - - // Condition to wait on when queue is full. - IgniteCondition notFull = lock.newCondition(NOT_FULL); - - // Signaled to wait on when queue is empty. - IgniteCondition notEmpty = lock.newCondition(NOT_EMPTY); - - // Signaled when job is done. - IgniteCondition done = lock.newCondition(SYNC_NAME); - - IgniteCache<String, Integer> cache = Ignition.ignite().cache(CACHE_NAME); - - for (int i = 0; i < OPS_COUNT; i++) { - try { - lock.lock(); - - int val = cache.get(QUEUE_ID); - - while(val >= 100){ - System.out.println("Queue is full. Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + - " paused."); - - notFull.await(); - - val = cache.get(QUEUE_ID); - } - - val++; - - System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + - ", available=" + val + ']'); - - cache.put(QUEUE_ID, val); - - notEmpty.signalAll(); - } - finally { - lock.unlock(); - - } - } - - System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); - - try { - lock.lock(); - - int count = cache.get(SYNC_NAME); - - count--; - - cache.put(SYNC_NAME, count); - - // Signals the master thread. - done.signal(); - }finally { - lock.unlock(); - } - } - } - - /** - * Closure which simulates consumer. - */ - private static class Consumer extends ReentrantLockExampleClosure { - /** - * @param reentrantLockName ReentrantLock name. - */ - public Consumer(String reentrantLockName) { - super(reentrantLockName); - } - - /** {@inheritDoc} */ - @Override public void run() { - System.out.println("Consumer started. "); - - Ignite g = Ignition.ignite(); - - IgniteReentrantLock lock = g.reentrantLock(reentrantLockName, true, true); - - // Condition to wait on when queue is full. - IgniteCondition notFull = lock.newCondition(NOT_FULL); - - // Signaled to wait on when queue is empty. - IgniteCondition notEmpty = lock.newCondition(NOT_EMPTY); - - // Signaled when job is done. - IgniteCondition done = lock.newCondition(SYNC_NAME); - - IgniteCache<String, Integer> cache = g.cache(CACHE_NAME); - - for (int i = 0; i < OPS_COUNT; i++) { - try { - lock.lock(); - - int val = cache.get(QUEUE_ID); - - while (val <= 0) { - System.out.println("Queue is empty. Consumer [nodeId=" + - Ignition.ignite().cluster().localNode().id() + " paused."); - - notEmpty.await(); - - val = cache.get(QUEUE_ID); - } - - val--; - - System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + - ", available=" + val + ']'); - - cache.put(QUEUE_ID, val); - - notFull.signalAll(); - } - finally { - lock.unlock(); - } - } - - System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); - - try { - lock.lock(); - - int count = cache.get(SYNC_NAME); - - count--; - - cache.put(SYNC_NAME, count); - - // Signals the master thread. - done.signal(); - }finally { - lock.unlock(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java index fea3627..f9af936 100644 --- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java +++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java @@ -32,7 +32,7 @@ import org.apache.ignite.examples.datastructures.IgniteAtomicSequenceExample; import org.apache.ignite.examples.datastructures.IgniteAtomicStampedExample; import org.apache.ignite.examples.datastructures.IgniteCountDownLatchExample; import org.apache.ignite.examples.datastructures.IgniteQueueExample; -import org.apache.ignite.examples.datastructures.IgniteReentrantLockExample; +import org.apache.ignite.examples.datastructures.IgniteLockExample; import org.apache.ignite.examples.datastructures.IgniteSemaphoreExample; import org.apache.ignite.examples.datastructures.IgniteSetExample; import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest; @@ -100,8 +100,8 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest { /** * @throws Exception If failed. */ - public void testCacheReentrantLockExample() throws Exception { - IgniteReentrantLockExample.main(EMPTY_ARGS); + public void testCacheLockExample() throws Exception { + IgniteLockExample.main(EMPTY_ARGS); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 1838fd1..d8b1e1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -471,14 +471,14 @@ public interface Ignite extends AutoCloseable { * * @param name Name of the lock. * @param failoverSafe {@code True} to create failover safe lock which means that - * if any node leaves topology lock already acquired by that node is silently released - * and become available for alive nodes to acquire. If flag is {@code false} then - * all threads waiting to acquire lock get interrupted. + * if any node leaves topology, all locks already acquired by that node are silently released + * and become available for other nodes to acquire. If flag is {@code false} then + * all threads on other nodes waiting to acquire lock are interrupted. * @param create Boolean flag indicating whether data structure should be created if does not exist. * @return ReentrantLock for the given name. * @throws IgniteException If reentrant lock could not be fetched or created. */ - public IgniteReentrantLock reentrantLock(String name, boolean failoverSafe, boolean create) + public IgniteLock reentrantLock(String name, boolean failoverSafe, boolean create) throws IgniteException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java index e79270d..1e5dde9 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java @@ -19,9 +19,9 @@ package org.apache.ignite; import java.util.Date; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; - -public interface IgniteCondition { +public interface IgniteCondition extends Condition { /** * Name of ignite condition. @@ -36,19 +36,23 @@ public interface IgniteCondition { * * <p>The lock associated with this {@code IgniteCondition} is atomically * released and the current thread becomes disabled for thread scheduling - * purposes and lies dormant until <em>one</em> of four things happens: + * purposes and lies dormant until <em>one</em> of six things happens: * <ul> - * <li>Some other thread invokes the {@link #signal} method for this + * <li>Some other thread (on any node) invokes the {@link #signal} method for this * {@code Condition} and the current thread happens to be chosen as the * thread to be awakened; or - * <li>Some other thread invokes the {@link #signalAll} method for this + * <li>Some other thread (on any node) invokes the {@link #signalAll} method for this * {@code Condition}; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} the * current thread, and interruption of thread suspension is supported; or + * <li> Some other node in grid fails, and lock is created in non-failoverSafe mode; or + * <li> Local node is stopped; or * <li>A "<em>spurious wakeup</em>" occurs. * </ul> * - * <p>In all cases, before this method can return the current thread must + * <p>If lock is not broken (because of failure of lock owner node) + * in non-failoverSafe mode and local node is alive, + * before this method can return the current thread must * re-acquire the lock associated with this condition. When the * thread returns it is <em>guaranteed</em> to hold this lock. * @@ -66,57 +70,16 @@ public interface IgniteCondition { * <p><b>Implementation Considerations</b> * * <p>The current thread is assumed to hold the lock associated with this - * {@code Condition} when this method is called. - * It is up to the implementation to determine if this is - * the case and if not, how to respond. Typically, an exception will be - * thrown (such as {@link IllegalMonitorStateException}) and the - * implementation must document that fact. - * - * <p>An implementation can favor responding to an interrupt over normal - * method return in response to a signal. In that case the implementation - * must ensure that the signal is redirected to another waiting thread, if - * there is one. + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. * * @throws IgniteInterruptedException if the current thread is interrupted - * (and interruption of thread suspension is supported) + * for any reason */ - void await() throws IgniteInterruptedException; + @Override void await() throws IgniteInterruptedException; - /** - * Causes the current thread to wait until it is signalled. - * - * <p>The lock associated with this condition is atomically - * released and the current thread becomes disabled for thread scheduling - * purposes and lies dormant until <em>one</em> of three things happens: - * <ul> - * <li>Some other thread invokes the {@link #signal} method for this - * {@code Condition} and the current thread happens to be chosen as the - * thread to be awakened; or - * <li>Some other thread invokes the {@link #signalAll} method for this - * {@code Condition}; or - * <li>A "<em>spurious wakeup</em>" occurs. - * </ul> - * - * <p>In all cases, before this method can return the current thread must - * re-acquire the lock associated with this condition. When the - * thread returns it is <em>guaranteed</em> to hold this lock. - * - * <p>If the current thread's interrupted status is set when it enters - * this method, or it is {@linkplain Thread#interrupt interrupted} - * while waiting, it will continue to wait until signalled. When it finally - * returns from this method its interrupted status will still - * be set. - * - * <p><b>Implementation Considerations</b> - * - * <p>The current thread is assumed to hold the lock associated with this - * {@code Condition} when this method is called. - * It is up to the implementation to determine if this is - * the case and if not, how to respond. Typically, an exception will be - * thrown (such as {@link IllegalMonitorStateException}) and the - * implementation must document that fact. - */ - void awaitUninterruptibly(); + /** {@inheritDoc} */ + @Override void awaitUninterruptibly(); /** * Causes the current thread to wait until it is signalled or interrupted, @@ -124,7 +87,7 @@ public interface IgniteCondition { * * <p>The lock associated with this condition is atomically * released and the current thread becomes disabled for thread scheduling - * purposes and lies dormant until <em>one</em> of five things happens: + * purposes and lies dormant until <em>one</em> of seven things happens: * <ul> * <li>Some other thread invokes the {@link #signal} method for this * {@code Condition} and the current thread happens to be chosen as the @@ -134,10 +97,14 @@ public interface IgniteCondition { * <li>Some other thread {@linkplain Thread#interrupt interrupts} the * current thread, and interruption of thread suspension is supported; or * <li>The specified waiting time elapses; or + * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or + * <li>Local node is stopped; or * <li>A "<em>spurious wakeup</em>" occurs. * </ul> * - * <p>In all cases, before this method can return the current thread must + * <p>If lock is not broken (because of failure of lock owner node) + * in non-failoverSafe mode and local node is alive, + * before this method can return the current thread must * re-acquire the lock associated with this condition. When the * thread returns it is <em>guaranteed</em> to hold this lock. * @@ -185,17 +152,8 @@ public interface IgniteCondition { * <p><b>Implementation Considerations</b> * * <p>The current thread is assumed to hold the lock associated with this - * {@code Condition} when this method is called. - * It is up to the implementation to determine if this is - * the case and if not, how to respond. Typically, an exception will be - * thrown (such as {@link IllegalMonitorStateException}) and the - * implementation must document that fact. - * - * <p>An implementation can favor responding to an interrupt over normal - * method return in response to a signal, or over indicating the elapse - * of the specified waiting time. In either case the implementation - * must ensure that the signal is redirected to another waiting thread, if - * there is one. + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. * * @param nanosTimeout the maximum time to wait, in nanoseconds * @return an estimate of the {@code nanosTimeout} value minus @@ -205,9 +163,9 @@ public interface IgniteCondition { * the desired time. A value less than or equal to zero * indicates that no time remains. * @throws IgniteInterruptedException if the current thread is interrupted - * (and interruption of thread suspension is supported) + * for any reason */ - long awaitNanos(long nanosTimeout) throws IgniteInterruptedException; + @Override long awaitNanos(long nanosTimeout) throws IgniteInterruptedException; /** * Causes the current thread to wait until it is signalled or interrupted, @@ -220,9 +178,9 @@ public interface IgniteCondition { * @return {@code false} if the waiting time detectably elapsed * before return from the method, else {@code true} * @throws IgniteInterruptedException if the current thread is interrupted - * (and interruption of thread suspension is supported) + * for any reason */ - boolean await(long time, TimeUnit unit) throws IgniteInterruptedException; + @Override boolean await(long time, TimeUnit unit) throws IgniteInterruptedException; /** * Causes the current thread to wait until it is signalled or interrupted, @@ -230,7 +188,7 @@ public interface IgniteCondition { * * <p>The lock associated with this condition is atomically * released and the current thread becomes disabled for thread scheduling - * purposes and lies dormant until <em>one</em> of five things happens: + * purposes and lies dormant until <em>one</em> of seven things happens: * <ul> * <li>Some other thread invokes the {@link #signal} method for this * {@code Condition} and the current thread happens to be chosen as the @@ -239,15 +197,18 @@ public interface IgniteCondition { * {@code Condition}; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} the * current thread, and interruption of thread suspension is supported; or + * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or + * <li>Local node is stopped; or * <li>The specified deadline elapses; or * <li>A "<em>spurious wakeup</em>" occurs. * </ul> * - * <p>In all cases, before this method can return the current thread must + * <p>If lock is not broken (because of failure of lock owner node) + * in non-failoverSafe mode and local node is alive, + * before this method can return the current thread must * re-acquire the lock associated with this condition. When the * thread returns it is <em>guaranteed</em> to hold this lock. * - * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or @@ -259,7 +220,6 @@ public interface IgniteCondition { * case, whether or not the test for interruption occurs before the lock * is released. * - * * <p>The return value indicates whether the deadline has elapsed, * which can be used as follows: * <pre> {@code @@ -281,17 +241,8 @@ public interface IgniteCondition { * <p><b>Implementation Considerations</b> * * <p>The current thread is assumed to hold the lock associated with this - * {@code Condition} when this method is called. - * It is up to the implementation to determine if this is - * the case and if not, how to respond. Typically, an exception will be - * thrown (such as {@link IllegalMonitorStateException}) and the - * implementation must document that fact. - * - * <p>An implementation can favor responding to an interrupt over normal - * method return in response to a signal, or over indicating the passing - * of the specified deadline. In either case the implementation - * must ensure that the signal is redirected to another waiting thread, if - * there is one. + * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException} + * will be thrown. * * @param deadline the absolute time to wait until * @return {@code false} if the deadline has elapsed upon return, else @@ -299,41 +250,11 @@ public interface IgniteCondition { * @throws IgniteInterruptedException if the current thread is interrupted * (and interruption of thread suspension is supported) */ - boolean awaitUntil(Date deadline) throws IgniteInterruptedException; + @Override boolean awaitUntil(Date deadline) throws IgniteInterruptedException; - /** - * Wakes up one waiting thread. - * - * <p>If any threads are waiting on this condition then one - * is selected for waking up. That thread must then re-acquire the - * lock before returning from {@code await}. - * - * <p><b>Implementation Considerations</b> - * - * <p>An implementation may (and typically does) require that the - * current thread hold the lock associated with this {@code - * Condition} when this method is called. Implementations must - * document this precondition and any actions taken if the lock is - * not held. Typically, an exception such as {@link - * IllegalMonitorStateException} will be thrown. - */ - void signal(); + /** {@inheritDoc} */ + @Override void signal(); - /** - * Wakes up all waiting threads. - * - * <p>If any threads are waiting on this condition then they are - * all woken up. Each thread must re-acquire the lock before it can - * return from {@code await}. - * - * <p><b>Implementation Considerations</b> - * - * <p>An implementation may (and typically does) require that the - * current thread hold the lock associated with this {@code - * Condition} when this method is called. Implementations must - * document this precondition and any actions taken if the lock is - * not held. Typically, an exception such as {@link - * IllegalMonitorStateException} will be thrown. - */ - void signalAll(); + /** {@inheritDoc} */ + @Override void signalAll(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java new file mode 100644 index 0000000..10053a8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * This interface provides a rich API for working with distributed reentrant locks. + * <p> + * <h1 class="header">Functionality</h1> + * Distributed reentrant lock provides functionality similar to {@code java.util.concurrent.ReentrantLock}. + * <h1 class="header">Creating Distributed ReentrantLock</h1> + * Instance of cache reentrant lock can be created by calling the following method: + * {@link Ignite#reentrantLock(String, boolean, boolean)}. + */ +public interface IgniteLock extends Lock, Closeable { + + /** + * Name of atomic reentrant lock. + * + * @return Name of atomic reentrant lock. + */ + public String name(); + + /** {@inheritDoc} */ + @Override public void lock(); + + /** + * Acquires the lock unless the current thread is + * {@linkplain Thread#interrupt interrupted}. + * + * <p>Acquires the lock if it is not held by another thread and returns + * immediately, setting the lock hold count to one. + * + * <p>If the current thread already holds this lock then the hold count + * is incremented by one and the method returns immediately. + * + * <p>If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of four things happens: + * + * <ul> + * + * <li>The lock is acquired by the current thread; or + * + * <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread. + * + * <li>Lock is broken (any node failed while owning this lock), and lock is created in + * non-failoverSafe mode. + * + * <li>Local node is stopped. + * + * </ul> + * + * <p>If the lock is acquired by the current thread then the lock hold + * count is set to one. + * + * <p>If the current thread: + * + * <ul> + * + * <li>has its interrupted status set on entry to this method; or + * + * <li>is {@linkplain Thread#interrupt interrupted} while acquiring + * the lock; or + * + * <li>lock is broken before or during the attempt to acquire this lock; or + * + * <li>local node is stopped, + * + * </ul> + * + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * <p>In this implementation, as this method is an explicit + * interruption point, preference is given to responding to the + * interrupt over normal or reentrant acquisition of the lock. + * + * @throws IgniteInterruptedException if the current thread is interrupted + */ + @Override public void lockInterruptibly() throws IgniteInterruptedException; + + /** {@inheritDoc} */ + @Override public boolean tryLock(); + + /** + * Acquires the lock if it is not held by another thread within the given + * waiting time and the current thread has not been + * {@linkplain Thread#interrupt interrupted}. + * + * <p>Acquires the lock if it is not held by another thread and returns + * immediately with the value {@code true}, setting the lock hold count + * to one. + * + * <p>If the current thread + * already holds this lock then the hold count is incremented by one and + * the method returns {@code true}. + * + * <p>If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of five things happens: + * + * <ul> + * + * <li>The lock is acquired by the current thread; or + * + * <li>Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + * + * <li>Lock is broken (any node failed while owning this lock), and lock is created in + * non-failoverSafe mode. + * + * <li>Local node is stopped. + * + * <li>The specified waiting time elapses + * + * </ul> + * + * <p>If the lock is acquired then the value {@code true} is returned and + * the lock hold count is set to one. + * + * <p>If the current thread: + * + * <ul> + * + * <li>has its interrupted status set on entry to this method; or + * + * <li>is {@linkplain Thread#interrupt interrupted} while + * acquiring the lock; or + * + * <li>lock is broken before or during the attempt to acquire this lock; or + * + * <li>local node is stopped, + * + * </ul> + * then {@link IgniteInterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * <p>If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * <p>In this implementation, as this method is an explicit + * interruption point, preference is given to responding to the + * interrupt over normal or reentrant acquisition of the lock, and + * over reporting the elapse of the waiting time. + * + * @param timeout the time to wait for the lock + * @param unit the time unit of the timeout argument + * @return {@code true} if the lock was free and was acquired by the + * current thread, or the lock was already held by the current + * thread; and {@code false} if the waiting time elapsed before + * the lock could be acquired + * @throws IgniteInterruptedException if the current thread is interrupted + * @throws NullPointerException if the time unit is null + */ + @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException; + + /** {@inheritDoc} */ + @Override public void unlock(); + + /** + * Returns a {@link Condition} instance for use with this + * {@link IgniteLock} instance. + * + * <ul> + * + * <li>If this lock is not held when any of the {@link Condition} + * {@linkplain Condition#await() waiting} or {@linkplain + * Condition#signal signalling} methods are called, then an {@link + * IllegalMonitorStateException} is thrown. + * + * <li>When the condition {@linkplain Condition#await() waiting} + * methods are called the lock is released and, before they + * return, the lock is reacquired and the lock hold count restored + * to what it was when the method was called. + * + * <li>If a thread is {@linkplain Thread#interrupt interrupted} + * while waiting then the wait will terminate, an {@link + * IgniteInterruptedException} will be thrown, and the thread's + * interrupted status will be cleared. + * + * <li> Waiting threads are signalled in FIFO order. + * + * </ul> + * + * @param name Name of the distributed condition object + * + * @return the Condition object + */ + public IgniteCondition getOrCreateCondition(String name); + + /** + * This method is not supported in IgniteLock, + * Any invocation of this method will result in {@linkplain UnsupportedOperationException}. + * Correct way to obtain Condition object is through method {@linkplain IgniteLock#getOrCreateCondition(String)} + * + * @return + */ + @Override public Condition newCondition(); + + /** + * Queries the number of holds on this lock by the current thread. + * + * @return the number of holds on this lock by the current thread, + * or zero if this lock is not held by the current thread + */ + public int getHoldCount(); + + /** + * Queries if this lock is held by the current thread. + * + * @return {@code true} if current thread holds this lock and + * {@code false} otherwise + */ + public boolean isHeldByCurrentThread(); + + /** + * Queries if this lock is held by any thread on any node. This method is + * designed for use in monitoring of the system state, + * not for synchronization control. + * + * @return {@code true} if any thread on this or any other node holds this lock and + * {@code false} otherwise + */ + public boolean isLocked(); + + /** + * Queries whether any threads on this node are waiting to acquire this lock. Note that + * because cancellations may occur at any time, a {@code true} + * return does not guarantee that any other thread will ever + * acquire this lock. This method is designed primarily for use in + * monitoring of the system state. + * + * @return {@code true} if there may be other threads on this node waiting to + * acquire the lock + */ + public boolean hasQueuedThreads(); + + /** + * Queries whether the given thread is waiting to acquire this + * lock. Note that because cancellations may occur at any time, a + * {@code true} return does not guarantee that this thread + * will ever acquire this lock. This method is designed primarily for use + * in monitoring of the system state. + * + * @param thread the thread + * @return {@code true} if the given thread is queued waiting for this lock + * @throws NullPointerException if the thread is null + */ + public boolean hasQueuedThread(Thread thread); + + /** + * Queries whether any threads on this node are waiting on the given condition + * associated with this lock. Note that because timeouts and + * interrupts may occur at any time, a {@code true} return does + * not guarantee that a future {@code signal} will awaken any + * threads. This method is designed primarily for use in + * monitoring of the system state. + * + * @param condition the condition + * @return {@code true} if there are any waiting threads on this node + * @throws IllegalMonitorStateException if this lock is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this lock + * @throws NullPointerException if the condition is null + */ + public boolean hasWaiters(IgniteCondition condition); + + /** + * Returns an estimate of the number of threads on this node that are waiting on the + * given condition associated with this lock. Note that because + * timeouts and interrupts may occur at any time, the estimate + * serves only as an upper bound on the actual number of waiters. + * This method is designed for use in monitoring of the system + * state, not for synchronization control. + * + * @param condition the condition + * @return the estimated number of waiting threads on this node + * @throws IgniteIllegalStateException if this lock is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this lock + * @throws NullPointerException if the condition is null + */ + public int getWaitQueueLength(IgniteCondition condition); + + /** + * Returns {@code true} if this lock is safe to use after node failure. + * If not, IgniteInterruptedException is thrown on every other node after node failure. + * + * @return {@code true} if this reentrant lock has failoverSafe set true + */ + public boolean isFailoverSafe(); + + /** + * Returns true if any node that owned the locked failed before releasing the lock. + * + * @return true if any node failed while owning the lock. + */ + public boolean isBroken(); + + /** {@inheritDoc} */ + @Override public String toString(); + + /** + * Gets status of reentrant lock. + * + * @return {@code true} if reentrant lock was removed from cache, {@code false} in other case. + */ + public boolean removed(); + + /** + * Removes reentrant lock. + * + * @throws IgniteException If operation failed. + */ + @Override public void close(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java b/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java deleted file mode 100644 index d209b51..0000000 --- a/modules/core/src/main/java/org/apache/ignite/IgniteReentrantLock.java +++ /dev/null @@ -1,434 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite; - -import java.io.Closeable; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -/** - * This interface provides a rich API for working with distributed reentrant lock. - * <p> - * <h1 class="header">Functionality</h1> - * Distributed reentrant lock provides functionality similar to {@code java.util.concurrent.ReentrantLock}. - * <h1 class="header">Creating Distributed ReentrantLock</h1> - * Instance of cache reentrant lock can be created by calling the following method: - * {@link Ignite#reentrantLock(String, boolean, boolean)}. - */ -public interface IgniteReentrantLock extends Closeable { - - /** - * Name of atomic reentrant lock. - * - * @return Name of atomic reentrant lock. - */ - public String name(); - - /** - * Acquires the lock. - * - * <p>Acquires the lock if it is not held by another thread and returns - * immediately, setting the lock hold count to one. - * - * <p>If the current thread already holds the lock then the hold - * count is incremented by one and the method returns immediately. - * - * <p>If the lock is held by another thread then the - * current thread becomes disabled for thread scheduling - * purposes and lies dormant until the lock has been acquired, - * at which time the lock hold count is set to one. - */ - public void lock(); - - /** - * Acquires the lock unless the current thread is - * {@linkplain Thread#interrupt interrupted}. - * - * <p>Acquires the lock if it is not held by another thread and returns - * immediately, setting the lock hold count to one. - * - * <p>If the current thread already holds this lock then the hold count - * is incremented by one and the method returns immediately. - * - * <p>If the lock is held by another thread then the - * current thread becomes disabled for thread scheduling - * purposes and lies dormant until one of two things happens: - * - * <ul> - * - * <li>The lock is acquired by the current thread; or - * - * <li>Some other thread {@linkplain Thread#interrupt interrupts} the - * current thread. - * - * </ul> - * - * <p>If the lock is acquired by the current thread then the lock hold - * count is set to one. - * - * <p>If the current thread: - * - * <ul> - * - * <li>has its interrupted status set on entry to this method; or - * - * <li>is {@linkplain Thread#interrupt interrupted} while acquiring - * the lock, - * - * </ul> - * - * then {@link IgniteInterruptedException} is thrown and the current thread's - * interrupted status is cleared. - * - * <p>In this implementation, as this method is an explicit - * interruption point, preference is given to responding to the - * interrupt over normal or reentrant acquisition of the lock. - * - * @throws IgniteInterruptedException if the current thread is interrupted - */ - public void lockInterruptibly() throws IgniteInterruptedException; - - /** - * Acquires the lock only if it is not held by another thread at the time - * of invocation. - * - * <p>Acquires the lock if it is not held by another thread and - * returns immediately with the value {@code true}, setting the - * lock hold count to one. - * - * <p>If the current thread already holds this lock then the hold - * count is incremented by one and the method returns {@code true}. - * - * <p>If the lock is held by another thread then this method will return - * immediately with the value {@code false}. - * - * @return {@code true} if the lock was free and was acquired by the - * current thread, or the lock was already held by the current - * thread; and {@code false} otherwise - */ - public boolean tryLock(); - - /** - * Acquires the lock if it is not held by another thread within the given - * waiting time and the current thread has not been - * {@linkplain Thread#interrupt interrupted}. - * - * <p>Acquires the lock if it is not held by another thread and returns - * immediately with the value {@code true}, setting the lock hold count - * to one. - * - * <p>If the current thread - * already holds this lock then the hold count is incremented by one and - * the method returns {@code true}. - * - * <p>If the lock is held by another thread then the - * current thread becomes disabled for thread scheduling - * purposes and lies dormant until one of three things happens: - * - * <ul> - * - * <li>The lock is acquired by the current thread; or - * - * <li>Some other thread {@linkplain Thread#interrupt interrupts} - * the current thread; or - * - * <li>The specified waiting time elapses - * - * </ul> - * - * <p>If the lock is acquired then the value {@code true} is returned and - * the lock hold count is set to one. - * - * <p>If the current thread: - * - * <ul> - * - * <li>has its interrupted status set on entry to this method; or - * - * <li>is {@linkplain Thread#interrupt interrupted} while - * acquiring the lock, - * - * </ul> - * then {@link IgniteInterruptedException} is thrown and the current thread's - * interrupted status is cleared. - * - * <p>If the specified waiting time elapses then the value {@code false} - * is returned. If the time is less than or equal to zero, the method - * will not wait at all. - * - * <p>In this implementation, as this method is an explicit - * interruption point, preference is given to responding to the - * interrupt over normal or reentrant acquisition of the lock, and - * over reporting the elapse of the waiting time. - * - * @param timeout the time to wait for the lock - * @param unit the time unit of the timeout argument - * @return {@code true} if the lock was free and was acquired by the - * current thread, or the lock was already held by the current - * thread; and {@code false} if the waiting time elapsed before - * the lock could be acquired - * @throws IgniteInterruptedException if the current thread is interrupted - * @throws NullPointerException if the time unit is null - */ - public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException; - - /** - * Attempts to release this lock. - * - * <p>If the current thread is the holder of this lock then the hold - * count is decremented. If the hold count is now zero then the lock - * is released. If the current thread is not the holder of this - * lock then {@link IllegalMonitorStateException} is thrown. - * - * @throws IllegalMonitorStateException if the current thread does not - * hold this lock - */ - public void unlock(); - - /** - * Returns a {@link Condition} instance for use with this - * {@link Lock} instance. - * - * <p>The returned {@link Condition} instance supports the same - * usages as do the {@link Object} monitor methods ({@link - * Object#wait() wait}, {@link Object#notify notify}, and {@link - * Object#notifyAll notifyAll}) when used with the built-in - * monitor lock. - * - * <ul> - * - * <li>If this lock is not held when any of the {@link Condition} - * {@linkplain Condition#await() waiting} or {@linkplain - * Condition#signal signalling} methods are called, then an {@link - * IllegalMonitorStateException} is thrown. - * - * <li>When the condition {@linkplain Condition#await() waiting} - * methods are called the lock is released and, before they - * return, the lock is reacquired and the lock hold count restored - * to what it was when the method was called. - * - * <li>If a thread is {@linkplain Thread#interrupt interrupted} - * while waiting then the wait will terminate, an {@link - * IgniteInterruptedException} will be thrown, and the thread's - * interrupted status will be cleared. - * - * <li> Waiting threads are signalled in FIFO order. - * - * </ul> - * - * @return the Condition object - */ - public IgniteCondition newCondition(String name); - - /** - * Queries the number of holds on this lock by the current thread. - * - * <p>A thread has a hold on a lock for each lock action that is not - * matched by an unlock action. - * - * <p>The hold count information is typically only used for testing and - * debugging purposes. For example, if a certain section of code should - * not be entered with the lock already held then we can assert that - * fact: - * - * <pre> {@code - * class X { - * ReentrantLock lock = new ReentrantLock(); - * // ... - * public void m() { - * assert lock.getHoldCount() == 0; - * lock.lock(); - * try { - * // ... method body - * } finally { - * lock.unlock(); - * } - * } - * }}</pre> - * - * @return the number of holds on this lock by the current thread, - * or zero if this lock is not held by the current thread - */ - public int getHoldCount(); - - /** - * Queries if this lock is held by the current thread. - * - * <p>Analogous to the {@link Thread#holdsLock(Object)} method for - * built-in monitor locks, this method is typically used for - * debugging and testing. For example, a method that should only be - * called while a lock is held can assert that this is the case: - * - * <pre> {@code - * class X { - * ReentrantLock lock = new ReentrantLock(); - * // ... - * - * public void m() { - * assert lock.isHeldByCurrentThread(); - * // ... method body - * } - * }}</pre> - * - * <p>It can also be used to ensure that a reentrant lock is used - * in a non-reentrant manner, for example: - * - * <pre> {@code - * class X { - * ReentrantLock lock = new ReentrantLock(); - * // ... - * - * public void m() { - * assert !lock.isHeldByCurrentThread(); - * lock.lock(); - * try { - * // ... method body - * } finally { - * lock.unlock(); - * } - * } - * }}</pre> - * - * @return {@code true} if current thread holds this lock and - * {@code false} otherwise - */ - public boolean isHeldByCurrentThread(); - - /** - * Queries if this lock is held by any thread. This method is - * designed for use in monitoring of the system state, - * not for synchronization control. - * - * @return {@code true} if any thread holds this lock and - * {@code false} otherwise - */ - public boolean isLocked(); - - /** - * Queries whether any threads are waiting to acquire this lock. Note that - * because cancellations may occur at any time, a {@code true} - * return does not guarantee that any other thread will ever - * acquire this lock. This method is designed primarily for use in - * monitoring of the system state. - * - * @return {@code true} if there may be other threads waiting to - * acquire the lock - */ - public boolean hasQueuedThreads(); - - /** - * Queries whether the given thread is waiting to acquire this - * lock. Note that because cancellations may occur at any time, a - * {@code true} return does not guarantee that this thread - * will ever acquire this lock. This method is designed primarily for use - * in monitoring of the system state. - * - * @param thread the thread - * @return {@code true} if the given thread is queued waiting for this lock - * @throws NullPointerException if the thread is null - */ - public boolean hasQueuedThread(Thread thread); - - /** - * Returns an estimate of the number of nodes waiting to - * acquire this lock. The value is only an estimate because the number of - * nodes may change dynamically while this method traverses - * internal data structures. This method is designed for use in - * monitoring of the system state, not for synchronization - * control. - * - * @return the estimated number of nodes waiting for this lock - */ - public int getQueueLength(); - - /** - * Queries whether any threads on this node are waiting on the given condition - * associated with this lock. Note that because timeouts and - * interrupts may occur at any time, a {@code true} return does - * not guarantee that a future {@code signal} will awaken any - * threads. This method is designed primarily for use in - * monitoring of the system state. - * - * @param condition the condition - * @return {@code true} if there are any waiting threads on this node - * @throws IllegalMonitorStateException if this lock is not held - * @throws IllegalArgumentException if the given condition is - * not associated with this lock - * @throws NullPointerException if the condition is null - */ - public boolean hasWaiters(IgniteCondition condition); - - /** - * Returns an estimate of the number of threads on this node are waiting on the - * given condition associated with this lock. Note that because - * timeouts and interrupts may occur at any time, the estimate - * serves only as an upper bound on the actual number of waiters. - * This method is designed for use in monitoring of the system - * state, not for synchronization control. - * - * @param condition the condition - * @return the estimated number of waiting threads on this node - * @throws IgniteIllegalStateException if this lock is not held - * @throws IllegalArgumentException if the given condition is - * not associated with this lock - * @throws NullPointerException if the condition is null - */ - public int getWaitQueueLength(IgniteCondition condition); - - /** - * Returns {@code true} if this lock is safe to use after node failure. - * If not, IgniteInterruptedException is thrown on every other node after node failure. - * - * @return {@code true} if this reentrant lock has failoverSafe set true - */ - public boolean isFailoverSafe(); - - /** - * Returns true if any node that owned the locked failed before releasing the lock.. - * - * @return true if any node failed while owning the lock. - */ - public boolean isBroken(); - - /** - * Returns a string identifying this lock, as well as its lock state. - * The state, in brackets, includes either the String {@code "Unlocked"} - * or the String {@code "Locked by"} followed by the - * {@linkplain UUID} of the owning node and {@linkplain Thread#getName name} - * of the owning thread. - * - * @return a string identifying this lock, as well as its lock state - */ - public String toString(); - - /** - * Gets status of reentrant lock. - * - * @return {@code true} if reentrant lock was removed from cache, {@code false} in other case. - */ - public boolean removed(); - - /** - * Removes reentrant lock. - * - * @throws IgniteException If operation failed. - */ - @Override public void close(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 21dde23..a2823d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -64,7 +64,7 @@ import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.IgniteQueue; -import org.apache.ignite.IgniteReentrantLock; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteScheduler; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteServices; @@ -2994,7 +2994,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ - @Nullable @Override public IgniteReentrantLock reentrantLock( + @Nullable @Override public IgniteLock reentrantLock( String name, boolean failoverSafe, boolean create http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 73aaddf..6d02b1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -43,9 +43,9 @@ import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteQueue; -import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; @@ -137,8 +137,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** Cache contains only {@code GridCacheSemaphoreState}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView; - /** Cache contains only {@code GridCacheReentrantLockState}. */ - private IgniteInternalCache<GridCacheInternalKey, GridCacheReentrantLockState> reentrantLockView; + /** Cache contains only {@code GridCacheLockState}. */ + private IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> reentrantLockView; /** Cache contains only {@code GridCacheAtomicReferenceValue}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView; @@ -194,8 +194,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { for (GridCacheRemovable ds : dsMap.values()) { if (ds instanceof GridCacheSemaphoreEx) ((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId); - else if(ds instanceof GridCacheReentrantLockEx) - ((GridCacheReentrantLockEx)ds).onNodeRemoved(leftNodeId); + else if(ds instanceof GridCacheLockEx) + ((GridCacheLockEx)ds).onNodeRemoved(leftNodeId); } return null; @@ -267,6 +267,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { @Override public void onKernalStop(boolean cancel) { super.onKernalStop(cancel); + for (GridCacheRemovable ds : dsMap.values()) { + if (ds instanceof GridCacheLockEx) + ((GridCacheLockEx)ds).stop(); + } + if (initLatch.getCount() > 0) { initFailed = true; @@ -1344,7 +1349,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * {@code create} is false. * @throws IgniteCheckedException If operation failed. */ - public IgniteReentrantLock reentrantLock(final String name, final boolean failoverSafe, final boolean create) + public IgniteLock reentrantLock(final String name, final boolean failoverSafe, final boolean create) throws IgniteCheckedException { A.notNull(name, "name"); @@ -1354,17 +1359,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { startQuery(); - return getAtomic(new IgniteOutClosureX<IgniteReentrantLock>() { - @Override public IgniteReentrantLock applyx() throws IgniteCheckedException { + return getAtomic(new IgniteOutClosureX<IgniteLock>() { + @Override public IgniteLock applyx() throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); dsCacheCtx.gate().enter(); try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheReentrantLockState val = cast(dsView.get(key), GridCacheReentrantLockState.class); + GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class); // Check that reentrant lock hasn't been created in other thread yet. - GridCacheReentrantLockEx reentrantLock = cast(dsMap.get(key), GridCacheReentrantLockEx.class); + GridCacheLockEx reentrantLock = cast(dsMap.get(key), GridCacheLockEx.class); if (reentrantLock != null) { assert val != null; @@ -1376,12 +1381,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { return null; if (val == null) { - val = new GridCacheReentrantLockState(0, dsCacheCtx.nodeId(), 0, failoverSafe); + val = new GridCacheLockState(0, dsCacheCtx.nodeId(), 0, failoverSafe); dsView.put(key, val); } - GridCacheReentrantLockEx reentrantLock0 = new GridCacheReentrantLockImpl( + GridCacheLockEx reentrantLock0 = new GridCacheLockImpl( name, key, reentrantLockView, @@ -1404,7 +1409,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().leave(); } } - }, new DataStructureInfo(name, REENTRANT_LOCK, null), create, GridCacheReentrantLockEx.class); + }, new DataStructureInfo(name, REENTRANT_LOCK, null), create, GridCacheLockEx.class); } /** @@ -1414,7 +1419,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @param broken Flag indicating the reentrant lock is broken and should be removed unconditionally. * @throws IgniteCheckedException If operation failed. */ - public void removeReentrantLock(final String name, boolean broken) throws IgniteCheckedException { + public void removeReentrantLock(final String name, final boolean broken) throws IgniteCheckedException { assert name != null; assert dsCacheCtx != null; @@ -1428,7 +1433,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. - GridCacheReentrantLockState val = cast(dsView.get(key), GridCacheReentrantLockState.class); + GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class); if (val != null) { if (val.get() > 0 && !broken) @@ -1500,7 +1505,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) return evt.getValue() instanceof GridCacheCountDownLatchValue || evt.getValue() instanceof GridCacheSemaphoreState || - evt.getValue() instanceof GridCacheReentrantLockState; + evt.getValue() instanceof GridCacheLockState; else { assert evt.getEventType() == EventType.REMOVED : evt; @@ -1597,22 +1602,22 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { ", actual=" + sem.getClass() + ", value=" + sem + ']'); } } - else if (val0 instanceof GridCacheReentrantLockState) { + else if (val0 instanceof GridCacheLockState) { GridCacheInternalKey key = evt.getKey(); // Notify reentrant lock on changes. final GridCacheRemovable reentrantLock = dsMap.get(key); - GridCacheReentrantLockState val = (GridCacheReentrantLockState)val0; + GridCacheLockState val = (GridCacheLockState)val0; - if (reentrantLock instanceof GridCacheReentrantLockEx) { - final GridCacheReentrantLockEx lock0 = (GridCacheReentrantLockEx)reentrantLock; + if (reentrantLock instanceof GridCacheLockEx) { + final GridCacheLockEx lock0 = (GridCacheLockEx)reentrantLock; lock0.onUpdate(val); } else if (reentrantLock != null) { U.error(log, "Failed to cast object " + - "[expected=" + IgniteReentrantLock.class.getSimpleName() + + "[expected=" + IgniteLock.class.getSimpleName() + ", actual=" + reentrantLock.getClass() + ", value=" + reentrantLock + ']'); } } @@ -1837,7 +1842,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { SEMAPHORE(IgniteSemaphore.class.getSimpleName()), /** */ - REENTRANT_LOCK(IgniteReentrantLock.class.getSimpleName()); + REENTRANT_LOCK(IgniteLock.class.getSimpleName()); /** */ private static final DataStructureType[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java new file mode 100644 index 0000000..8d6e085 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.util.UUID; +import org.apache.ignite.IgniteLock; + +/** + * Grid cache reentrant lock ({@code 'Ex'} stands for external). + */ +public interface GridCacheLockEx extends IgniteLock, GridCacheRemovable { + /** + * Get current reentrant lock latch key. + * + * @return Lock key. + */ + public GridCacheInternalKey key(); + + /** + * Callback to notify reentrant lock on changes. + * + * @param state New reentrant lock state. + */ + public void onUpdate(GridCacheLockState state); + + /** + * Callback to notify semaphore on topology changes. + * + * @param nodeId Id of the node that left the grid. + */ + public void onNodeRemoved(UUID nodeId); + + /** + * Callback to notify local reentrant lock instance on node stop. + */ + public void stop(); + +}
