ignite-638: Implement IgniteSemaphore data structure
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e7e3309 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e7e3309 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e7e3309 Branch: refs/heads/ignite-sql-opt Commit: 8e7e330904b80f9a13659fdd7cf7f12dd6a36037 Parents: 900788b Author: Vladisav Jelisavcic <[email protected]> Authored: Fri Nov 20 17:39:40 2015 +0300 Committer: Denis Magda <[email protected]> Committed: Fri Nov 20 17:39:40 2015 +0300 ---------------------------------------------------------------------- .../datastructures/IgniteSemaphoreExample.java | 168 ++++ .../ignite/examples/CacheExamplesSelfTest.java | 10 +- .../src/main/java/org/apache/ignite/Ignite.java | 17 + .../java/org/apache/ignite/IgniteSemaphore.java | 312 ++++++++ .../apache/ignite/events/DiscoveryEvent.java | 6 +- .../apache/ignite/internal/IgniteKernal.java | 21 + .../datastructures/DataStructuresProcessor.java | 199 ++++- .../datastructures/GridCacheSemaphoreEx.java | 47 ++ .../datastructures/GridCacheSemaphoreImpl.java | 763 +++++++++++++++++++ .../datastructures/GridCacheSemaphoreState.java | 144 ++++ .../IgniteClientReconnectAtomicsTest.java | 44 +- ...eAbstractDataStructuresFailoverSelfTest.java | 275 ++++++- .../IgniteClientDataStructuresAbstractTest.java | 59 +- .../IgniteDataStructureUniqueNameTest.java | 14 +- .../IgniteSemaphoreAbstractSelfTest.java | 411 ++++++++++ .../local/IgniteLocalSemaphoreSelfTest.java | 98 +++ .../IgnitePartitionedSemaphoreSelfTest.java | 33 + ...eplicatedDataStructuresFailoverSelfTest.java | 2 +- .../IgniteReplicatedSemaphoreSelfTest.java | 33 + .../cache/GridCacheDataStructuresLoadTest.java | 283 ++++--- .../ignite/testframework/junits/IgniteMock.java | 10 + .../junits/multijvm/IgniteProcessProxy.java | 7 + .../org/apache/ignite/IgniteSpringBean.java | 12 + 23 files changed, 2837 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java new file mode 100644 index 0000000..1c078b0 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datastructures; + +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteSemaphore; +import org.apache.ignite.Ignition; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.lang.IgniteRunnable; + +/** + * This example demonstrates cache based semaphore. + * <p> + * Remote nodes should always be started with special configuration + * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code + * examples/config/example-ignite.xml} configuration. + */ +public class IgniteSemaphoreExample { + /** Number of items for each producer/consumer to produce/consume. */ + private static final int OPS_COUNT = 100; + + /** Number of producers. */ + private static final int NUM_PRODUCERS = 10; + + /** Number of consumers. */ + private static final int NUM_CONSUMERS = 10; + + /** Synchronization semaphore name. */ + private static final String SEM_NAME = IgniteSemaphoreExample.class.getSimpleName(); + + /** + * Executes example. + * + * @param args Command line arguments, none required. + */ + public static void main(String[] args) { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Cache atomic semaphore example started."); + + // Initialize semaphore. + IgniteSemaphore syncSemaphore = ignite.semaphore(SEM_NAME, 0, false, true); + + // Make name of semaphore. + final String semaphoreName = UUID.randomUUID().toString(); + + // Initialize semaphore. + IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true); + + // Start consumers on all cluster nodes. + for (int i = 0; i < NUM_CONSUMERS; i++) + ignite.compute().withAsync().run(new Consumer(semaphoreName)); + + // Start producers on all cluster nodes. + for (int i = 0; i < NUM_PRODUCERS; i++) + ignite.compute().withAsync().run(new Producer(semaphoreName)); + + System.out.println("Master node is waiting for all other nodes to finish..."); + + // Wait for everyone to finish. + syncSemaphore.acquire(NUM_CONSUMERS + NUM_PRODUCERS); + } + + System.out.flush(); + System.out.println(); + System.out.println("Finished semaphore example..."); + System.out.println("Check all nodes for output (this node is also part of the cluster)."); + } + + /** + * Closure which simply waits on the latch on all nodes. + */ + private abstract static class SemaphoreExampleClosure implements IgniteRunnable { + /** Semaphore name. */ + protected final String semaphoreName; + + /** + * @param semaphoreName Semaphore name. + */ + SemaphoreExampleClosure(String semaphoreName) { + this.semaphoreName = semaphoreName; + } + } + + /** + * Closure which simply signals the semaphore. + */ + private static class Producer extends SemaphoreExampleClosure { + /** + * @param semaphoreName Semaphore name. + */ + public Producer(String semaphoreName) { + super(semaphoreName); + } + + /** {@inheritDoc} */ + @Override public void run() { + IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true); + + for (int i = 0; i < OPS_COUNT; i++) { + System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + ", available=" + semaphore.availablePermits() + ']'); + + // Signals others that shared resource is available. + semaphore.release(); + } + + System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); + + // Gets the syncing semaphore + IgniteSemaphore sem = Ignition.ignite().semaphore(SEM_NAME, 0, true, true); + + // Signals the master thread + sem.release(); + } + } + + /** + * Closure which simply waits on semaphore. + */ + private static class Consumer extends SemaphoreExampleClosure { + /** + * @param semaphoreName Semaphore name. + */ + public Consumer(String semaphoreName) { + super(semaphoreName); + } + + /** {@inheritDoc} */ + @Override public void run() { + IgniteSemaphore sem = Ignition.ignite().semaphore(semaphoreName, 0, true, true); + + for (int i = 0; i < OPS_COUNT; i++) { + // Block if no permits are available. + sem.acquire(); + + System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + + ", available=" + sem.availablePermits() + ']'); + } + + System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']'); + + // Gets the syncing semaphore + IgniteSemaphore sync = Ignition.ignite().semaphore(SEM_NAME, 3, true, true); + + // Signals the master thread. + sync.release(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java index 79f404a..c11fa1a 100644 --- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java +++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.examples.datastructures.IgniteAtomicReferenceExample; import org.apache.ignite.examples.datastructures.IgniteAtomicSequenceExample; import org.apache.ignite.examples.datastructures.IgniteAtomicStampedExample; import org.apache.ignite.examples.datastructures.IgniteCountDownLatchExample; +import org.apache.ignite.examples.datastructures.IgniteSemaphoreExample; import org.apache.ignite.examples.datastructures.IgniteQueueExample; import org.apache.ignite.examples.datastructures.IgniteSetExample; import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest; @@ -84,6 +85,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest { /** * @throws Exception If failed. */ + public void testCacheSemaphoreExample() throws Exception { + IgniteSemaphoreExample.main(EMPTY_ARGS); + } + + /** + * @throws Exception If failed. + */ public void testCacheQueueExample() throws Exception { IgniteQueueExample.main(EMPTY_ARGS); } @@ -150,4 +158,4 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest { public void testCacheContinuousQueryExample() throws Exception { CacheContinuousQueryExample.main(EMPTY_ARGS); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index fc9cf06..17221ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -430,6 +430,23 @@ public interface Ignite extends AutoCloseable { throws IgniteException; /** + * Gets or creates semaphore. If semaphore is not found in cache and {@code create} flag + * is {@code true}, it is created using provided name and count parameter. + * + * @param name Name of the semaphore. + * @param cnt Count for new semaphore creation. Ignored if {@code create} flag is {@code false}. + * @param failoverSafe {@code True} to create failover safe semaphore which means that + * if any node leaves topology permits already acquired by that node are silently released + * and become available for alive nodes to acquire. If flag is {@code false} then + * all threads waiting for available permits get interrupted. + * @param create Boolean flag indicating whether data structure should be created if does not exist. + * @return Semaphore for the given name. + * @throws IgniteException If semaphore could not be fetched or created. + */ + public IgniteSemaphore semaphore(String name, int cnt, boolean failoverSafe, boolean create) + throws IgniteException; + + /** * Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not * {@code null}. * If queue is present already, queue properties will not be changed. Use http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java new file mode 100644 index 0000000..db748b4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; + +/** + * This interface provides a rich API for working with distributed semaphore. + * <p> + * <h1 class="header">Functionality</h1> + * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}. + * <h1 class="header">Creating Distributed Semaphore</h1> + * Instance of cache semaphore can be created by calling the following method: + * {@link Ignite#semaphore(String, int, boolean, boolean)}. + */ +public interface IgniteSemaphore extends Closeable { + /** + * Gets name of the semaphore. + * + * @return Name of the semaphore. + */ + public String name(); + + /** + * Acquires a permit from this semaphore, blocking until one is available, or the thread is {@linkplain + * Thread#interrupt interrupted}. + * + * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by + * one. + * + * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies + * dormant until one of two things happens: <ul> <li>Some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain + * Thread#interrupt interrupts} the current thread. </ul> + * + * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain + * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown + * and the current thread's interrupted status is cleared. + * + * @throws IgniteInterruptedException if the current thread is interrupted + */ + public void acquire() throws IgniteInterruptedException; + + /** + * Acquires a permit from this semaphore, blocking until one is available. + * + * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by + * one. + * + * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies + * dormant until some other thread invokes the {@link #release} method for this semaphore and the current thread is + * next to be assigned a permit. + * + * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for a permit then it will + * continue to wait, but the time at which the thread is assigned a permit may change compared to the time it would + * have received the permit had no interruption occurred. When the thread does return from this method its + * interrupt status will be set. + */ + public void acquireUninterruptibly(); + + /** + * Acquires a permit from this semaphore, only if one is available at the time of invocation. + * + * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the + * number of available permits by one. + * + * <p>If no permit is available then this method will return immediately with the value {@code false}. + * + * @return {@code true} if a permit was acquired and {@code false} otherwise + */ + public boolean tryAcquire(); + + /** + * Acquires a permit from this semaphore, if one becomes available within the given waiting time and the current + * thread has not been {@linkplain Thread#interrupt interrupted}. + * + * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the + * number of available permits by one. + * + * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies + * dormant until one of three things happens: <ul> <li>Some other thread invokes the {@link #release} method for + * this semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain + * Thread#interrupt interrupts} the current thread; or <li>The specified waiting time elapses. </ul> + * + * <p>If a permit is acquired then the value {@code true} is returned. + * + * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain + * Thread#interrupt interrupted} while waiting to acquire a permit, </ul> then {@link IgniteInterruptedException} is + * thrown and the current thread's interrupted status is cleared. + * + * <p>If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or + * equal to zero, the method will not wait at all. + * + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} if the waiting time elapsed before a permit was + * acquired + * @throws IgniteInterruptedException if the current thread is interrupted + */ + public boolean tryAcquire(long timeout, TimeUnit unit) + throws IgniteInterruptedException; + + /** + * Acquires the given number of permits from this semaphore, blocking until all are available. + * + * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of + * available permits by the given amount. + * + * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes + * and lies dormant until some other thread invokes one of the {@link #release() release} methods for this + * semaphore, the current thread is next to be assigned permits and the number of available permits satisfies this + * request. + * + * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for permits then it will + * continue to wait and its position in the queue is not affected. When the thread does return from this method its + * interrupt status will be set. + * + * @param permits the number of permits to acquire + * @throws IllegalArgumentException if {@code permits} is negative + */ + public void acquireUninterruptibly(int permits); + + /** + * Returns the current number of permits available in this semaphore. + * + * <p>This method is typically used for debugging and testing purposes. + * + * @return the number of permits available in this semaphore + */ + public int availablePermits(); + + /** + * Acquires and returns all permits that are immediately available. + * + * @return the number of permits acquired + */ + public int drainPermits(); + + /** + * Releases a permit, returning it to the semaphore. + * + * <p>Releases a permit, increasing the number of available permits by one. If any threads are trying to acquire a + * permit, then one is selected and given the permit that was just released. That thread is (re)enabled for thread + * scheduling purposes. + * + * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link + * #acquire}. Correct usage of a semaphore is established by programming convention in the application. + */ + public void release(); + + /** + * Acquires the given number of permits from this semaphore, if all become available within the given waiting time + * and the current thread has not been {@linkplain Thread#interrupt interrupted}. + * + * <p>Acquires the given number of permits, if they are available and returns immediately, with the value {@code + * true}, reducing the number of available permits by the given amount. + * + * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes + * and lies dormant until one of three things happens: <ul> <li>Some other thread invokes one of the {@link + * #release() release} methods for this semaphore, the current thread is next to be assigned permits and the number + * of available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or <li>The specified waiting time elapses. </ul> + * + * <p>If the permits are acquired then the value {@code true} is returned. + * + * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain + * Thread#interrupt interrupted} while waiting to acquire the permits, </ul> then {@link IgniteInterruptedException} + * is thrown and the current thread's interrupted status is cleared. Any permits that were to be assigned to this + * thread, are instead assigned to other threads trying to acquire permits, as if the permits had been made + * available by a call to {@link #release()}. + * + * <p>If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or + * equal to zero, the method will not wait at all. Any permits that were to be assigned to this thread, are instead + * assigned to other threads trying to acquire permits, as if the permits had been made available by a call to + * {@link #release()}. + * + * @param permits the number of permits to acquire + * @param timeout the maximum time to wait for the permits + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if all permits were acquired and {@code false} if the waiting time elapsed before all + * permits were acquired + * @throws IgniteInterruptedException if the current thread is interrupted + * @throws IllegalArgumentException if {@code permits} is negative + */ + public boolean tryAcquire(int permits, long timeout, TimeUnit unit) + throws IgniteInterruptedException; + + /** + * Acquires the given number of permits from this semaphore, only if all are available at the time of invocation. + * + * <p>Acquires the given number of permits, if they are available, and returns immediately, with the value {@code + * true}, reducing the number of available permits by the given amount. + * + * <p>If insufficient permits are available then this method will return immediately with the value {@code false} + * and the number of available permits is unchanged. + * + * <p>If you want to honor the failoverSafe setting, then use {@link #tryAcquire(int, long, TimeUnit) + * tryAcquire(permits, 0, TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption). + * + * @param permits the number of permits to acquire + * @return {@code true} if the permits were acquired and {@code false} otherwise + * @throws IllegalArgumentException if {@code permits} is negative + */ + public boolean tryAcquire(int permits); + + /** + * Acquires the given number of permits from this semaphore, blocking until all are available, or the thread is + * {@linkplain Thread#interrupt interrupted}. + * + * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of + * available permits by the given amount. + * + * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes + * and lies dormant until one of two things happens: <ul> <li>Some other thread invokes one of the {@link #release() + * release} methods for this semaphore, the current thread is next to be assigned permits and the number of + * available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread. </ul> + * + * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain + * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown + * and the current thread's interrupted status is cleared. Any permits that were to be assigned to this thread are + * instead assigned to other threads trying to acquire permits, as if permits had been made available by a call to + * {@link #release()}. + * + * @param permits the number of permits to acquire + * @throws IgniteInterruptedException if the current thread is interrupted + * @throws IllegalArgumentException if {@code permits} is negative + */ + public void acquire(int permits) throws IgniteInterruptedException; + + /** + * Releases the given number of permits, returning them to the semaphore. + * + * <p>Releases the given number of permits, increasing the number of available permits by that amount. If any + * threads are trying to acquire permits, then one is selected and given the permits that were just released. If the + * number of available permits satisfies that thread's request then that thread is (re)enabled for thread scheduling + * purposes; otherwise the thread will wait until sufficient permits are available. If there are still permits + * available after this thread's request has been satisfied, then those permits are assigned in turn to other + * threads trying to acquire permits. + * + * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link + * IgniteSemaphore#acquire acquire}. Correct usage of a semaphore is established by programming convention in the + * application. + * + * @param permits the number of permits to release + * @throws IllegalArgumentException if {@code permits} is negative + */ + public void release(int permits); + + /** + * Returns {@code true} if this semaphore is safe to use after node failure. + * If not, IgniteInterruptedException is thrown on every other node after node failure. + * + * @return {@code true} if this semaphore has failoverSafe set true + */ + public boolean isFailoverSafe(); + + /** + * Queries whether any threads are waiting to acquire. Note that because cancellations may occur at any time, a + * {@code true} return does not guarantee that any other thread will ever acquire. This method is designed + * primarily for use in monitoring of the system state. + * + * @return {@code true} if there may be other threads waiting to acquire the lock + */ + public boolean hasQueuedThreads(); + + /** + * Returns an estimate of the number of nodes waiting to acquire. The value is only an estimate because the number + * of nodes that are waiting may change dynamically while this method traverses internal data structures. This method is designed + * for use in monitoring of the system state, not for synchronization control. + * + * @return the estimated number of nodes waiting for this lock + */ + public int getQueueLength(); + + /** + * Gets {@code broken} status of the semaphore. + * + * @return {@code True} if a node failed on this semaphore and failoverSafe flag was set to false, {@code false} otherwise. + */ + public boolean isBroken(); + + /** + * Gets {@code removed} status of the semaphore. + * + * @return {@code True} if semaphore was removed from cache, {@code false} otherwise. + */ + public boolean removed(); + + /** + * Removes this semaphore. + * + * @throws IgniteException If operation failed. + */ + @Override public void close(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java index 49c4f6e..09f23bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java @@ -113,9 +113,9 @@ public class DiscoveryEvent extends EventAdapter { /** * Gets node that caused this event to be generated. It is potentially different from the node * on which this event was recorded. For example, node {@code A} locally recorded the event that a remote node - * {@code B} joined the topology. In this case this method will return ID of {@code B}. + * {@code B} joined the topology. In this case this method will return node {@code B}. * - * @return Event node ID. + * @return Event node. */ public ClusterNode eventNode() { return evtNode; @@ -162,4 +162,4 @@ public class DiscoveryEvent extends EventAdapter { "type", name(), "tstamp", timestamp()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index f1d67af..02096dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -58,6 +58,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteEvents; import org.apache.ignite.IgniteException; @@ -2936,6 +2937,26 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ + @Nullable @Override public IgniteSemaphore semaphore( + String name, + int cnt, + boolean failoverSafe, + boolean create + ) { + guard(); + + try { + return ctx.dataStructures().semaphore(name, cnt, failoverSafe, create); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Nullable @Override public <T> IgniteQueue<T> queue(String name, int cap, CollectionConfiguration cfg) http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 810bd8c..b532d7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -45,16 +45,20 @@ import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.CacheType; @@ -82,12 +86,15 @@ import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_LONG; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_REF; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_SEQ; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_STAMPED; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE; +import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -131,13 +138,16 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** Cache contains only {@code GridCacheCountDownLatchValue}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheCountDownLatchValue> cntDownLatchView; + /** Cache contains only {@code GridCacheSemaphoreState}. */ + private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView; + /** Cache contains only {@code GridCacheAtomicReferenceValue}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView; /** Cache contains only {@code GridCacheAtomicStampedValue}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicStampedValue> atomicStampedView; - /** Cache contains only entry {@code GridCacheSequenceValue}. */ + /** Cache contains only entry {@code GridCacheSequenceValue}. */ private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView; /** Cache context for atomic data structures. */ @@ -167,6 +177,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + super.start(); + + ctx.event().addLocalEventListener( + new GridLocalEventListener() { + @Override public void onEvent(final Event evt) { + // This may require cache operation to exectue, + // therefore cannot use event notification thread. + ctx.closure().callLocalSafe( + new Callable<Object>() { + @Override public Object call() throws Exception { + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID leftNodeId = discoEvt.eventNode().id(); + + for (GridCacheRemovable ds : dsMap.values()) { + if (ds instanceof GridCacheSemaphoreEx) + ((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId); + } + + return null; + } + }, + false); + } + }, + EVT_NODE_LEFT, + EVT_NODE_FAILED); + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart() throws IgniteCheckedException { if (ctx.config().isDaemon()) @@ -187,6 +228,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { cntDownLatchView = atomicsCache; + semView = atomicsCache; + atomicLongView = atomicsCache; atomicRefView = atomicsCache; @@ -262,7 +305,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * * @param name Sequence name. * @param initVal Initial value for sequence. If sequence already cached, {@code initVal} will be ignored. - * @param create If {@code true} sequence will be created in case it is not in cache. + * @param create If {@code true} sequence will be created in case it is not in cache. * @return Sequence. * @throws IgniteCheckedException If loading failed. */ @@ -1194,6 +1237,124 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** + * Gets or creates semaphore. If semaphore is not found in cache, + * it is created using provided name and count parameter. + * + * @param name Name of the semaphore. + * @param cnt Initial count. + * @param failoverSafe {@code True} FailoverSafe parameter. + * @param create If {@code true} semaphore will be created in case it is not in cache, + * if it is {@code false} all parameters except {@code name} are ignored. + * @return Semaphore for the given name or {@code null} if it is not found and + * {@code create} is false. + * @throws IgniteCheckedException If operation failed. + */ + public IgniteSemaphore semaphore(final String name, final int cnt, final boolean failoverSafe, final boolean create) + throws IgniteCheckedException { + A.notNull(name, "name"); + + awaitInitialization(); + + checkAtomicsConfiguration(); + + startQuery(); + + return getAtomic(new IgniteOutClosureX<IgniteSemaphore>() { + @Override public IgniteSemaphore applyx() throws IgniteCheckedException { + GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + + dsCacheCtx.gate().enter(); + + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class); + + // Check that semaphore hasn't been created in other thread yet. + GridCacheSemaphoreEx sem = cast(dsMap.get(key), GridCacheSemaphoreEx.class); + + if (sem != null) { + assert val != null; + + return sem; + } + + if (val == null && !create) + return null; + + if (val == null) { + val = new GridCacheSemaphoreState(cnt, new HashMap<UUID, Integer>(), failoverSafe); + + dsView.put(key, val); + } + + GridCacheSemaphoreEx sem0 = new GridCacheSemaphoreImpl( + name, + key, + semView, + dsCacheCtx); + + dsMap.put(key, sem0); + + tx.commit(); + + return sem0; + } + catch (Error | Exception e) { + dsMap.remove(key); + + U.error(log, "Failed to create semaphore: " + name, e); + + throw e; + } + finally { + dsCacheCtx.gate().leave(); + } + } + }, new DataStructureInfo(name, SEMAPHORE, null), create, GridCacheSemaphoreEx.class); + } + + /** + * Removes semaphore from cache. + * + * @param name Name of the semaphore. + * @throws IgniteCheckedException If operation failed. + */ + public void removeSemaphore(final String name) throws IgniteCheckedException { + assert name != null; + assert dsCacheCtx != null; + + awaitInitialization(); + + removeDataStructure(new IgniteOutClosureX<Void>() { + @Override public Void applyx() throws IgniteCheckedException { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); + + dsCacheCtx.gate().enter(); + + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + // Check correctness type of removable object. + GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class); + + if (val != null) { + if (val.getCount() < 0) + throw new IgniteCheckedException("Failed to remove semaphore with blocked threads. "); + + dsView.remove(key); + + tx.commit(); + } + else + tx.setRollbackOnly(); + + return null; + } + finally { + dsCacheCtx.gate().leave(); + } + } + }, name, SEMAPHORE, null); + } + + /** * Remove internal entry by key from cache. * * @param key Internal entry key. @@ -1240,7 +1401,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) - return evt.getValue() instanceof GridCacheCountDownLatchValue; + return evt.getValue() instanceof GridCacheCountDownLatchValue || + evt.getValue() instanceof GridCacheSemaphoreState; else { assert evt.getEventType() == EventType.REMOVED : evt; @@ -1318,6 +1480,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { ", actual=" + latch.getClass() + ", value=" + latch + ']'); } } + else if (val0 instanceof GridCacheSemaphoreState) { + GridCacheInternalKey key = evt.getKey(); + + // Notify semaphore on changes. + final GridCacheRemovable sem = dsMap.get(key); + + GridCacheSemaphoreState val = (GridCacheSemaphoreState)val0; + + if (sem instanceof GridCacheSemaphoreEx) { + final GridCacheSemaphoreEx semaphore0 = (GridCacheSemaphoreEx)sem; + + semaphore0.onUpdate(val); + } + else if (sem != null) { + U.error(log, "Failed to cast object " + + "[expected=" + IgniteSemaphore.class.getSimpleName() + + ", actual=" + sem.getClass() + ", value=" + sem + ']'); + } + } } else { @@ -1407,7 +1588,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @return Removed value. */ @SuppressWarnings("unchecked") - @Nullable private <T> T retryRemove(final IgniteInternalCache cache, final Object key) throws IgniteCheckedException { + @Nullable private <T> T retryRemove(final IgniteInternalCache cache, final Object key) + throws IgniteCheckedException { return retry(log, new Callable<T>() { @Nullable @Override public T call() throws Exception { return (T)cache.getAndRemove(key); @@ -1432,7 +1614,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { catch (ClusterGroupEmptyCheckedException e) { throw new IgniteCheckedException(e); } - catch (IgniteTxRollbackCheckedException | CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) { + catch (IgniteTxRollbackCheckedException | + CachePartialUpdateCheckedException | + ClusterTopologyCheckedException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { @@ -1535,7 +1719,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { QUEUE(IgniteQueue.class.getSimpleName()), /** */ - SET(IgniteSet.class.getSimpleName()); + SET(IgniteSet.class.getSimpleName()), + + /** */ + SEMAPHORE(IgniteSemaphore.class.getSimpleName()); /** */ private static final DataStructureType[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java new file mode 100644 index 0000000..4d39635 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.util.UUID; +import org.apache.ignite.IgniteSemaphore; + +/** + * Grid cache semaphore ({@code 'Ex'} stands for external). + */ +public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable { + /** + * Get current semaphore key. + * + * @return Semaphore key. + */ + public GridCacheInternalKey key(); + + /** + * Callback to notify semaphore on changes. + * + * @param val State containing the number of available permissions. + */ + public void onUpdate(GridCacheSemaphoreState val); + + /** + * Callback to notify semaphore on topology changes. + * + * @param nodeId Id of the node that left the grid. + */ + public void onNodeRemoved(UUID nodeId); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java new file mode 100644 index 0000000..37df9d5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Cache semaphore implementation based on AbstractQueuedSynchronizer. + * Current implementation supports only unfair semaphores. + * If any node fails after acquiring permissions on cache semaphore, there are two different behaviors controlled with the + * parameter failoverSafe. If this parameter is true, other nodes can reacquire permits that were acquired by the failing node. + * In case this parameter is false, IgniteInterruptedException is called on every node waiting on this semaphore. + */ +public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Deserialization stash. */ + private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash = + new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() { + @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() { + return F.t2(); + } + }; + + /** Logger. */ + private IgniteLogger log; + + /** Semaphore name. */ + private String name; + + /** Removed flag. */ + private volatile boolean rmvd; + + /** Semaphore key. */ + private GridCacheInternalKey key; + + /** Semaphore projection. */ + private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView; + + /** Cache context. */ + private GridCacheContext ctx; + + /** Initialization guard. */ + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Initialization latch. */ + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** Internal synchronization object. */ + private Sync sync; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridCacheSemaphoreImpl() { + // No-op. + } + + /** + * Synchronization implementation for semaphore. + * Uses AQS state to represent permits. + */ + final class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = 1192457210091910933L; + + /** Map containing number of acquired permits for each node waiting on this semaphore. */ + private Map<UUID, Integer> nodeMap; + + /** Flag indicating that it is safe to continue after node that acquired semaphore fails. */ + final boolean failoverSafe; + + /** Flag indicating that a node failed and it is not safe to continue using this semaphore. */ + protected boolean broken = false; + + protected Sync(int permits, Map<UUID, Integer> waiters, boolean failoverSafe) { + setState(permits); + nodeMap = waiters; + this.failoverSafe = failoverSafe; + } + + /** + * Sets a map containing number of permits acquired by each node using this semaphore. This method should only + * be called in {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}. + * + * @param nodeMap NodeMap. + */ + protected synchronized void setWaiters(Map<UUID, Integer> nodeMap) { + this.nodeMap = nodeMap; + } + + /** + * Gets the number of nodes waiting at this semaphore. + * + * @return Number of nodes waiting at this semaphore. + */ + public int getWaiters() { + int totalWaiters = 0; + + for (Integer i : nodeMap.values()) { + if (i > 0) + totalWaiters++; + } + + return totalWaiters; + } + + /** + * Get number of permits for node. + * + * @param nodeID Node ID. + * @return Number of permits node has acquired at this semaphore. Can be less than 0 if + * more permits were released than acquired on node. + */ + public int getPermitsForNode(UUID nodeID){ + return nodeMap.containsKey(nodeID) ? nodeMap.get(nodeID) : 0; + } + + /** + * Sets the number of permits currently available on this semaphore. This method should only be used in + * {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}. + * + * @param permits Number of permits available at this semaphore. + */ + final synchronized void setPermits(int permits) { + setState(permits); + } + + /** + * Gets the number of permissions currently available. + * + * @return Number of permits available at this semaphore. + */ + final int getPermits() { + return getState(); + } + + /** + * This method is used by the AQS to test if the current thread should block or not. + * + * @param acquires Number of permits to acquire. + * @return Negative number if thread should block, positive if thread successfully acquires permits. + */ + final int nonfairTryAcquireShared(int acquires) { + for (;;) { + int available = getState(); + + int remaining = available - acquires; + + if (remaining < 0 || compareAndSetGlobalState(available, remaining, false)) { + return remaining; + } + } + } + + /** {@inheritDoc} */ + @Override protected int tryAcquireShared(int acquires) { + return nonfairTryAcquireShared(acquires); + } + + /** {@inheritDoc} */ + @Override protected final boolean tryReleaseShared(int releases) { + // Check if some other node updated the state. + // This method is called with release==0 only when trying to wake through update. + if (releases == 0) + return true; + + for (;;) { + int cur = getState(); + + int next = cur + releases; + + if (next < cur) // overflow + throw new Error("Maximum permit count exceeded"); + + if (compareAndSetGlobalState(cur, next, false)) + return true; + } + } + + /** + * This method is used internally to implement {@linkplain GridCacheSemaphoreImpl#drainPermits()}. + * + * @return Number of permits to drain. + */ + final int drainPermits() { + for (;;) { + + int current = getState(); + + if (current == 0 || compareAndSetGlobalState(current, 0, true)) + return current; + } + } + + /** + * This method is used for synchronizing the semaphore state across all nodes. + * + * @param expVal Expected number of permits. + * @param newVal New number of permits. + * @param draining True if used for draining the permits. + * @return True if this is the call that succeeded to change the global state. + */ + protected boolean compareAndSetGlobalState(final int expVal, final int newVal, final boolean draining) { + try { + return CU.outTx( + retryTopologySafe(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, + semView, + PESSIMISTIC, REPEATABLE_READ) + ) { + GridCacheSemaphoreState val = semView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find semaphore with given name: " + + name); + + boolean retVal = val.getCount() == expVal; + + if (retVal) { + // If this is not a call to drain permits, + // Modify global permission count for the calling node. + if (!draining) { + UUID nodeID = ctx.localNodeId(); + + Map<UUID,Integer> map = val.getWaiters(); + + int waitingCnt = expVal - newVal; + + if(map.containsKey(nodeID)) + waitingCnt += map.get(nodeID); + + map.put(nodeID, waitingCnt); + + val.setWaiters(map); + } + + val.setCount(newVal); + + semView.put(key, val); + + tx.commit(); + } + + return retVal; + } + catch (Error | Exception e) { + if (!ctx.kernalContext().isStopping()) + U.error(log, "Failed to compare and set: " + this, e); + + throw e; + } + } + }), + ctx + ); + } + catch (IgniteCheckedException e) { + if (ctx.kernalContext().isStopping()) { + if (log.isDebugEnabled()) + log.debug("Ignoring failure in semaphore on node left handler (node is stopping): " + e); + + return true; + } + else + throw U.convertException(e); + } + } + + /** + * This method is used for releasing the permits acquired by failing node. + * + * @param nodeId ID of the failing node. + * @return True if this is the call that succeeded to change the global state. + */ + protected boolean releaseFailedNode(final UUID nodeId) { + try { + return CU.outTx( + retryTopologySafe(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + try ( + IgniteInternalTx tx = CU.txStartInternal(ctx, + semView, + PESSIMISTIC, REPEATABLE_READ) + ) { + GridCacheSemaphoreState val = semView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find semaphore with given name: " + + name); + + Map<UUID,Integer> map = val.getWaiters(); + + if(!map.containsKey(nodeId)){ + tx.rollback(); + + return false; + } + + int numPermits = map.get(nodeId); + + if(numPermits > 0) + val.setCount(val.getCount() + numPermits); + + map.remove(nodeId); + + val.setWaiters(map); + + semView.put(key, val); + + sync.nodeMap = map; + + tx.commit(); + + return true; + } + catch (Error | Exception e) { + if (!ctx.kernalContext().isStopping()) + U.error(log, "Failed to compare and set: " + this, e); + + throw e; + } + } + }), + ctx + ); + } + catch (IgniteCheckedException e) { + if (ctx.kernalContext().isStopping()) { + if (log.isDebugEnabled()) + log.debug("Ignoring failure in semaphore on node left handler (node is stopping): " + e); + + return true; + } + else + throw U.convertException(e); + } + } + } + + /** + * Constructor. + * + * @param name Semaphore name. + * @param key Semaphore key. + * @param semView Semaphore projection. + * @param ctx Cache context. + */ + public GridCacheSemaphoreImpl( + String name, + GridCacheInternalKey key, + IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView, + GridCacheContext ctx + ) { + assert name != null; + assert key != null; + assert semView != null; + assert ctx != null; + + this.name = name; + this.key = key; + this.semView = semView; + this.ctx = ctx; + + log = ctx.logger(getClass()); + } + + /** + * @throws IgniteCheckedException If operation failed. + */ + private void initializeSemaphore() throws IgniteCheckedException { + if (!initGuard.get() && initGuard.compareAndSet(false, true)) { + try { + sync = CU.outTx( + retryTopologySafe(new Callable<Sync>() { + @Override public Sync call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, + semView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheSemaphoreState val = semView.get(key); + + if (val == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find semaphore with given name: " + name); + + return null; + } + + final int count = val.getCount(); + + Map<UUID, Integer> waiters = val.getWaiters(); + + final boolean failoverSafe = val.isFailoverSafe(); + + tx.commit(); + + return new Sync(count, waiters, failoverSafe); + } + } + }), + ctx + ); + + if (log.isDebugEnabled()) + log.debug("Initialized internal sync structure: " + sync); + } + finally { + initLatch.countDown(); + } + } + else { + U.await(initLatch); + + if (sync == null) + throw new IgniteCheckedException("Internal semaphore has not been properly initialized."); + } + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public GridCacheInternalKey key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return rmvd; + } + + /** {@inheritDoc} */ + @Override public boolean onRemoved() { + return rmvd = true; + } + + /** {@inheritDoc} */ + @Override public void onUpdate(GridCacheSemaphoreState val) { + if (sync == null) + return; + + // Update permission count. + sync.setPermits(val.getCount()); + + // Update waiters' counts. + sync.setWaiters(val.getWaiters()); + + // Try to notify any waiting threads. + sync.releaseShared(0); + } + + /** {@inheritDoc} */ + @Override public void onNodeRemoved(UUID nodeId) { + int numPermits = sync.getPermitsForNode(nodeId); + + if (numPermits > 0) { + if (sync.failoverSafe) + // Release permits acquired by threads on failing node. + sync.releaseFailedNode(nodeId); + else { + // Interrupt every waiting thread if this semaphore is not failover safe. + sync.broken = true; + + for (Thread t : sync.getSharedQueuedThreads()) + t.interrupt(); + + // Try to notify any waiting threads. + sync.releaseShared(0); + } + } + } + + /** {@inheritDoc} */ + @Override public void needCheckNotRemoved() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void acquire() throws IgniteInterruptedException { + acquire(1); + } + + /** {@inheritDoc} */ + @Override public void acquire(int permits) throws IgniteInterruptedException { + A.ensure(permits >= 0, "Number of permits must be non-negative."); + + try { + initializeSemaphore(); + + if(isBroken()) + Thread.currentThread().interrupt(); + + sync.acquireSharedInterruptibly(permits); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void acquireUninterruptibly() { + try { + initializeSemaphore(); + + sync.acquireShared(1); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void acquireUninterruptibly(int permits) { + A.ensure(permits >= 0, "Number of permits must be non-negative."); + + try { + initializeSemaphore(); + + sync.acquireShared(permits); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public int availablePermits() { + int ret; + try { + initializeSemaphore(); + + ret = CU.outTx( + retryTopologySafe(new Callable<Integer>() { + @Override public Integer call() throws Exception { + try ( + IgniteInternalTx tx = CU.txStartInternal(ctx, + semView, PESSIMISTIC, REPEATABLE_READ) + ) { + GridCacheSemaphoreState val = semView.get(key); + + if (val == null) + throw new IgniteException("Failed to find semaphore with given name: " + name); + + int count = val.getCount(); + + tx.rollback(); + + return count; + } + } + }), + ctx + ); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + return ret; + } + + /** {@inheritDoc} */ + @Override public int drainPermits() { + try { + initializeSemaphore(); + + return sync.drainPermits(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean tryAcquire() { + try { + initializeSemaphore(); + + return sync.nonfairTryAcquireShared(1) >= 0; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException { + try { + initializeSemaphore(); + + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void release() { + release(1); + } + + /** {@inheritDoc} */ + @Override public void release(int permits) { + A.ensure(permits >= 0, "Number of permits must be non-negative."); + + try { + initializeSemaphore(); + + sync.releaseShared(permits); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean tryAcquire(int permits) { + A.ensure(permits >= 0, "Number of permits must be non-negative."); + + try { + initializeSemaphore(); + + return sync.nonfairTryAcquireShared(permits) >= 0; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException { + A.ensure(permits >= 0, "Number of permits must be non-negative."); + try { + initializeSemaphore(); + + return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isFailoverSafe() { + return sync.failoverSafe; + } + + /** {@inheritDoc} */ + @Override public boolean hasQueuedThreads() { + try { + initializeSemaphore(); + + return sync.getWaiters() != 0; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public int getQueueLength() { + try { + initializeSemaphore(); + + return sync.getWaiters(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isBroken(){ + return sync.broken; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx.kernalContext()); + out.writeUTF(name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + t.set1((GridKernalContext)in.readObject()); + t.set2(in.readUTF()); + } + + /** {@inheritDoc} */ + @Override public void close() { + if (!rmvd) { + try { + ctx.kernalContext().dataStructures().removeSemaphore(name); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheSemaphoreImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java new file mode 100644 index 0000000..50cdf10 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.GridCacheInternal; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Grid cache semaphore state. + */ +public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable { + /** */ + private static final long serialVersionUID = 0L; + + /** Permission count. */ + private int cnt; + + /** Map containing number of acquired permits for each node waiting on this semaphore. */ + @GridToStringInclude + private Map<UUID, Integer> waiters; + + /** FailoverSafe flag. */ + private boolean failoverSafe; + + /** + * Constructor. + * + * @param cnt Number of permissions. + * @param waiters Waiters map. + * @param failoverSafe Failover safe flag. + */ + public GridCacheSemaphoreState(int cnt, @Nullable Map<UUID,Integer> waiters, boolean failoverSafe) { + this.cnt = cnt; + this.waiters = waiters; + this.failoverSafe = failoverSafe; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridCacheSemaphoreState() { + // No-op. + } + + /** + * @param cnt New count. + */ + public void setCount(int cnt) { + this.cnt = cnt; + } + + /** + * @return Current count. + */ + public int getCount() { + return cnt; + } + + /** + * @return Waiters. + */ + public Map<UUID,Integer> getWaiters() { + return waiters; + } + + /** + * @param waiters Map containing the number of permissions acquired by each node. + */ + public void setWaiters(Map<UUID, Integer> waiters) { + this.waiters = waiters; + } + + /** + * @return failoverSafe flag. + */ + public boolean isFailoverSafe() { + return failoverSafe; + } + + /** {@inheritDoc} */ + @Override public Object clone() throws CloneNotSupportedException { + return super.clone(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(cnt); + out.writeBoolean(failoverSafe); + out.writeBoolean(waiters != null); + + if (waiters != null) { + out.writeInt(waiters.size()); + + for (Map.Entry<UUID, Integer> e : waiters.entrySet()) { + U.writeUuid(out, e.getKey()); + out.writeInt(e.getValue()); + } + } + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + cnt = in.readInt(); + failoverSafe = in.readBoolean(); + + if (in.readBoolean()) { + int size = in.readInt(); + + waiters = U.newHashMap(size); + + for (int i = 0; i < size; i++) + waiters.put(U.readUuid(in), in.readInt()); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheSemaphoreState.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index 55dbb57..c46b5c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.testframework.GridTestUtils; @@ -675,4 +676,45 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(srvLatch.await(1000)); assertTrue(clientLatch.await(1000)); } -} \ No newline at end of file + + /** + * @throws Exception If failed. + */ + public void testSemaphoreReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteSemaphore clientSemaphore = client.semaphore("semaphore1", 3, false, true); + + assertEquals(3, clientSemaphore.availablePermits()); + + final IgniteSemaphore srvSemaphore = srv.semaphore("semaphore1", 3, false, false); + + assertEquals(3, srvSemaphore.availablePermits()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvSemaphore.acquire(); + } + }); + + assertEquals(2, srvSemaphore.availablePermits()); + assertEquals(2, clientSemaphore.availablePermits()); + + srvSemaphore.acquire(); + + assertEquals(1, srvSemaphore.availablePermits()); + assertEquals(1, clientSemaphore.availablePermits()); + + clientSemaphore.acquire(); + + assertEquals(0, srvSemaphore.availablePermits()); + assertEquals(0, clientSemaphore.availablePermits()); + + assertFalse(srvSemaphore.tryAcquire()); + assertFalse(srvSemaphore.tryAcquire()); + } +}
