IGNITE-3515: NullPointerException when stopping IgniteSemaphore and no method has been called previously to initialize semaphore with initializeSemaphore(). Reviewed by Denis Magda.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab4963a6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab4963a6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab4963a6 Branch: refs/heads/ignite-3553 Commit: ab4963a686ea7ca560604079c52b8939b605cfcf Parents: a989f04 Author: Krome Plasma <[email protected]> Authored: Fri Jul 22 14:53:39 2016 +0300 Committer: Denis Magda <[email protected]> Committed: Fri Jul 22 14:53:39 2016 +0300 ---------------------------------------------------------------------- .../datastructures/GridCacheSemaphoreImpl.java | 108 ++++++++++++------- .../IgniteSemaphoreAbstractSelfTest.java | 25 ++++- 2 files changed, 94 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ab4963a6/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index 8f196be..a11c79d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; @@ -51,11 +52,11 @@ import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** - * Cache semaphore implementation based on AbstractQueuedSynchronizer. - * Current implementation supports only unfair semaphores. - * If any node fails after acquiring permissions on cache semaphore, there are two different behaviors controlled with the - * parameter failoverSafe. If this parameter is true, other nodes can reacquire permits that were acquired by the failing node. - * In case this parameter is false, IgniteInterruptedException is called on every node waiting on this semaphore. + * Cache semaphore implementation based on AbstractQueuedSynchronizer. Current implementation supports only unfair + * semaphores. If any node fails after acquiring permissions on cache semaphore, there are two different behaviors + * controlled with the parameter failoverSafe. If this parameter is true, other nodes can reacquire permits that were + * acquired by the failing node. In case this parameter is false, IgniteInterruptedException is called on every node + * waiting on this semaphore. */ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable { /** */ @@ -104,8 +105,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter } /** - * Synchronization implementation for semaphore. - * Uses AQS state to represent permits. + * Synchronization implementation for semaphore. Uses AQS state to represent permits. */ final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; @@ -155,10 +155,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter * Get number of permits for node. * * @param nodeID Node ID. - * @return Number of permits node has acquired at this semaphore. Can be less than 0 if - * more permits were released than acquired on node. + * @return Number of permits node has acquired at this semaphore. Can be less than 0 if more permits were + * released than acquired on node. */ - public int getPermitsForNode(UUID nodeID){ + public int getPermitsForNode(UUID nodeID) { return nodeMap.containsKey(nodeID) ? nodeMap.get(nodeID) : 0; } @@ -182,12 +182,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter } /** - * Set a flag indicating that it is not safe to continue using this semaphore. - * This is the case only if one of two things happened: - * 1. A node that previously acquired on this semaphore failed and - * semaphore is created in non-failoversafe mode; - * 2. Local node failed (is closed), so any any threads on this node - * waiting to acquire are notified, and semaphore is not safe to be used anymore. + * Set a flag indicating that it is not safe to continue using this semaphore. This is the case only if one of + * two things happened: 1. A node that previously acquired on this semaphore failed and semaphore is created in + * non-failoversafe mode; 2. Local node failed (is closed), so any any threads on this node waiting to acquire + * are notified, and semaphore is not safe to be used anymore. * * @return True is semaphore is not safe to be used anymore. */ @@ -195,11 +193,12 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter return broken; } - /** Flag indicating that a node failed and it is not safe to continue using this semaphore. - * Any attempt to acquire on broken semaphore will result in {@linkplain IgniteInterruptedException}. + /** + * Flag indicating that a node failed and it is not safe to continue using this semaphore. Any attempt to + * acquire on broken semaphore will result in {@linkplain IgniteInterruptedException}. * * @param broken True if semaphore should not be used anymore. - * */ + */ protected void setBroken(boolean broken) { this.broken = broken; } @@ -211,9 +210,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter * @return Negative number if thread should block, positive if thread successfully acquires permits. */ final int nonfairTryAcquireShared(int acquires) { - for (;;) { + for (; ; ) { // If broken, return immediately, exception will be thrown anyway. - if(broken) + if (broken) return 1; int available = getState(); @@ -238,9 +237,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter if (releases == 0) return true; - for (;;) { + for (; ; ) { // If broken, return immediately, exception will be thrown anyway. - if(broken) + if (broken) return true; int cur = getState(); @@ -261,9 +260,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter * @return Number of permits to drain. */ final int drainPermits() { - for (;;) { + for (; ; ) { // If broken, return immediately, exception will be thrown anyway. - if(broken) + if (broken) return 1; int current = getState(); @@ -288,7 +287,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter @Override public Boolean call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, semView, - PESSIMISTIC, REPEATABLE_READ) + PESSIMISTIC, REPEATABLE_READ) ) { GridCacheSemaphoreState val = semView.get(key); @@ -304,11 +303,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter if (!draining) { UUID nodeID = ctx.localNodeId(); - Map<UUID,Integer> map = val.getWaiters(); + Map<UUID, Integer> map = val.getWaiters(); int waitingCnt = expVal - newVal; - if(map.containsKey(nodeID)) + if (map.containsKey(nodeID)) waitingCnt += map.get(nodeID); map.put(nodeID, waitingCnt); @@ -370,9 +369,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter throw new IgniteCheckedException("Failed to find semaphore with given name: " + name); - Map<UUID,Integer> map = val.getWaiters(); + Map<UUID, Integer> map = val.getWaiters(); - if(!map.containsKey(nodeId)){ + if (!map.containsKey(nodeId)) { tx.rollback(); return false; @@ -380,7 +379,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter int numPermits = map.get(nodeId); - if(numPermits > 0) + if (numPermits > 0) val.setCount(val.getCount() + numPermits); map.remove(nodeId); @@ -552,7 +551,42 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter } } + /** {@inheritDoc} */ @Override public void stop() { + if (initGuard.get()) { + try { + // Wait while initialization is in progress. + U.await(initLatch); + } + catch (IgniteInterruptedCheckedException e) { + if (log.isDebugEnabled()) + log.error("Failed waiting while initialization is completed.", e); + } + } + else { + // Preventing concurrent initialization. + if (initGuard.compareAndSet(false, true)) { + initLatch.countDown(); + + if (log.isDebugEnabled()) + log.debug("Semaphore wasn't initialized. Prevented further initialization."); + + return; + } + else { + try { + // Wait while initialization is in progress. + U.await(initLatch); + } + catch (IgniteInterruptedCheckedException e) { + if (log.isDebugEnabled()) + log.error("Failed waiting while initialization is completed.", e); + } + } + } + + assert sync != null; + sync.setBroken(true); // Try to notify any waiting threads. @@ -580,7 +614,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter sync.acquireSharedInterruptibly(permits); - if(isBroken()) + if (isBroken()) throw new InterruptedException(); } catch (IgniteCheckedException e) { @@ -697,7 +731,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter boolean result = sync.nonfairTryAcquireShared(1) >= 0; - if(isBroken()) + if (isBroken()) throw new InterruptedException(); return result; @@ -722,7 +756,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter boolean result = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); - if(isBroken()) + if (isBroken()) throw new InterruptedException(); return result; @@ -789,9 +823,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter try { initializeSemaphore(); - boolean result = sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); + boolean result = sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); - if(isBroken()) + if (isBroken()) throw new InterruptedException(); return result; @@ -859,7 +893,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter } /** {@inheritDoc} */ - @Override public boolean isBroken(){ + @Override public boolean isBroken() { ctx.kernalContext().gateway().readLock(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/ab4963a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java index 200e276..5241dd1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSemaphore; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; @@ -39,6 +40,7 @@ import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.GridStringLogger; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; import org.junit.Rule; @@ -234,6 +236,25 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr /** * @throws Exception If failed. */ + public void testSemaphoreClosing() throws Exception { + IgniteConfiguration cfg; + GridStringLogger stringLogger; + + stringLogger = new GridStringLogger(); + + cfg = optimize(getConfiguration("npeGrid")); + cfg.setGridLogger(stringLogger); + + try (Ignite ignite = startGrid(cfg.getGridName(), cfg)) { + ignite.semaphore("semaphore", 1, true, true); + } + + assertFalse(stringLogger.toString().contains(NullPointerException.class.getName())); + } + + /** + * @throws Exception If failed. + */ private void checkSemaphoreSerialization() throws Exception { final IgniteSemaphore sem = grid(0).semaphore("semaphore", -gridCount() + 1, true, true); @@ -276,8 +297,8 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr } /** - * This method only checks if parameter of new semaphore is initialized properly. - * For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest. + * This method only checks if parameter of new semaphore is initialized properly. For tests considering failure + * recovery see * * @throws Exception Exception. */
