shutdownAll now syncs on class before this changed to an AtomicBoolean, remove cache sync on addPartitionedRegion and requiresNotificationFromPR
fixed formatting. Removed class sync on rmqFactory back to volatile boolean so that second SDA will wait for the first Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/eb6ab1aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/eb6ab1aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/eb6ab1aa Branch: refs/heads/feature/GEM-983 Commit: eb6ab1aaf18ffe9222c23cce6a3d6d61acc5bfca Parents: 2ef50b2 Author: Darrel Schneider <[email protected]> Authored: Tue Oct 11 15:04:43 2016 -0700 Committer: Darrel Schneider <[email protected]> Committed: Fri Oct 28 14:37:14 2016 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/GemFireCacheImpl.java | 157 +++++++++---------- 1 file changed, 70 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eb6ab1aa/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index ba4f1f4..3c4718c 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -664,7 +664,7 @@ public class GemFireCacheImpl sb.append("GemFireCache["); sb.append("id = " + System.identityHashCode(this)); sb.append("; isClosing = " + this.isClosing); - sb.append("; isShutDownAll = " + this.isShutDownAll); + sb.append("; isShutDownAll = " + isCacheAtShutdownAll()); sb.append("; created = " + this.creationDate); sb.append("; server = " + this.isServer); sb.append("; copyOnRead = " + this.copyOnRead); @@ -874,7 +874,7 @@ public class GemFireCacheImpl this.cqService = CqServiceProvider.create(this); - initReliableMessageQueueFactory(); + this.rmqFactory = new ReliableMessageQueueFactoryImpl(); // Create the CacheStatistics this.cachePerfStats = new CachePerfStats(system); @@ -1751,58 +1751,59 @@ public class GemFireCacheImpl } } - public synchronized void shutDownAll() { - boolean testIGE = Boolean.getBoolean("TestInternalGemFireError"); + public void shutDownAll() { + synchronized (GemFireCacheImpl.class) { + boolean testIGE = Boolean.getBoolean("TestInternalGemFireError"); - if (testIGE) { - InternalGemFireError assErr = new InternalGemFireError( - LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString()); - throw assErr; - } - if (isCacheAtShutdownAll()) { - // it's already doing shutdown by another thread - return; - } - if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { - try { - CacheObserverHolder.getInstance().beforeShutdownAll(); - } finally { - LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; - } - } - this.isShutDownAll = true; - - // bug 44031 requires multithread shutdownall should be grouped - // by root region. However, shutDownAllDuringRecovery.conf test revealed that - // we have to close colocated child regions first. - // Now check all the PR, if anyone has colocate-with attribute, sort all the - // PRs by colocation relationship and close them sequentially, otherwise still - // group them by root region. - TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees(); - if (prTrees.size() > 1 && shutdownAllPoolSize != 1) { - ExecutorService es = getShutdownAllExecutorService(prTrees.size()); - for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { - es.execute(new Runnable() { - public void run() { - ConnectionTable.threadWantsSharedResources(); - shutdownSubTreeGracefully(prSubMap); - } - }); - } // for each root - es.shutdown(); - try { - es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.debug("Shutdown all interrupted while waiting for PRs to be shutdown gracefully."); + if (testIGE) { + InternalGemFireError assErr = new InternalGemFireError(LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString()); + throw assErr; + } + if (isCacheAtShutdownAll()) { + // it's already doing shutdown by another thread + return; } + if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { + try { + CacheObserverHolder.getInstance().beforeShutdownAll(); + } finally { + LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; + } + } + this.isShutDownAll = true; + + // bug 44031 requires multithread shutdownall should be grouped + // by root region. However, shutDownAllDuringRecovery.conf test revealed that + // we have to close colocated child regions first. + // Now check all the PR, if anyone has colocate-with attribute, sort all the + // PRs by colocation relationship and close them sequentially, otherwise still + // group them by root region. + TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees(); + if (prTrees.size() > 1 && shutdownAllPoolSize != 1) { + ExecutorService es = getShutdownAllExecutorService(prTrees.size()); + for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { + es.execute(new Runnable() { + public void run() { + ConnectionTable.threadWantsSharedResources(); + shutdownSubTreeGracefully(prSubMap); + } + }); + } // for each root + es.shutdown(); + try { + es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.debug("Shutdown all interrupted while waiting for PRs to be shutdown gracefully."); + } - } else { - for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { - shutdownSubTreeGracefully(prSubMap); + } else { + for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { + shutdownSubTreeGracefully(prSubMap); + } } - } - close("Shut down all members", null, false, true); + close("Shut down all members", null, false, true); + } } private ExecutorService getShutdownAllExecutorService(int size) { @@ -4180,17 +4181,15 @@ public class GemFireCacheImpl * regions when this cache requires, or does not require notification of all region/entry events. */ public void addPartitionedRegion(PartitionedRegion r) { - synchronized (GemFireCacheImpl.class) { - synchronized (this.partitionedRegions) { - if (r.isDestroyed()) { - if (logger.isDebugEnabled()) { - logger.debug("GemFireCache#addPartitionedRegion did not add destroyed {}", r); - } - return; - } - if (this.partitionedRegions.add(r)) { - getCachePerfStats().incPartitionedRegions(1); + synchronized (this.partitionedRegions) { + if (r.isDestroyed()) { + if (logger.isDebugEnabled()) { + logger.debug("GemFireCache#addPartitionedRegion did not add destroyed {}", r); } + return; + } + if (this.partitionedRegions.add(r)) { + getCachePerfStats().incPartitionedRegions(1); } } } @@ -4288,22 +4287,20 @@ public class GemFireCacheImpl * @return true if the region should deliver all of its events to this cache */ protected boolean requiresNotificationFromPR(PartitionedRegion r) { - synchronized (GemFireCacheImpl.class) { - boolean hasSerialSenders = hasSerialSenders(r); - boolean result = hasSerialSenders; - if (!result) { - Iterator allCacheServersIterator = allCacheServers.iterator(); - while (allCacheServersIterator.hasNext()) { - CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next(); - if (!server.getNotifyBySubscription()) { - result = true; - break; - } + boolean hasSerialSenders = hasSerialSenders(r); + boolean result = hasSerialSenders; + if (!result) { + Iterator allCacheServersIterator = allCacheServers.iterator(); + while (allCacheServersIterator.hasNext()) { + CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next(); + if (!server.getNotifyBySubscription()) { + result = true; + break; } - } - return result; + } + return result; } private boolean hasSerialSenders(PartitionedRegion r) { @@ -4483,25 +4480,11 @@ public class GemFireCacheImpl /** * This cache's reliable message queue factory. Should always have an instance of it. */ - private ReliableMessageQueueFactory rmqFactory; + private final ReliableMessageQueueFactory rmqFactory; private List<File> backupFiles = Collections.emptyList(); /** - * Initializes the reliable message queue. Needs to be called at cache creation - * - * @throws IllegalStateException if the factory is in use - */ - private void initReliableMessageQueueFactory() { - synchronized (GemFireCacheImpl.class) { - if (this.rmqFactory != null) { - this.rmqFactory.close(false); - } - this.rmqFactory = new ReliableMessageQueueFactoryImpl(); - } - } - - /** * Returns this cache's ReliableMessageQueueFactory. * * @since GemFire 5.0
