shutdownAll now syncs on class before this changed to an AtomicBoolean, remove cache sync on addPartitionedRegion and requiresNotificationFromPR
fixed formatting. Removed class sync on rmqFactory back to volatile boolean so that second SDA will wait for the first Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fe4cf4e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fe4cf4e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fe4cf4e4 Branch: refs/heads/feature/GEM-983 Commit: fe4cf4e4e972d6a29ae7508e2596f1de63b3e5e2 Parents: bb30357 Author: Darrel Schneider <[email protected]> Authored: Tue Oct 11 15:04:43 2016 -0700 Committer: Darrel Schneider <[email protected]> Committed: Mon Oct 31 10:04:03 2016 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/GemFireCacheImpl.java | 157 +++++++++---------- 1 file changed, 70 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fe4cf4e4/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index ba4f1f4..3c4718c 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -664,7 +664,7 @@ public class GemFireCacheImpl sb.append("GemFireCache["); sb.append("id = " + System.identityHashCode(this)); sb.append("; isClosing = " + this.isClosing); - sb.append("; isShutDownAll = " + this.isShutDownAll); + sb.append("; isShutDownAll = " + isCacheAtShutdownAll()); sb.append("; created = " + this.creationDate); sb.append("; server = " + this.isServer); sb.append("; copyOnRead = " + this.copyOnRead); @@ -874,7 +874,7 @@ public class GemFireCacheImpl this.cqService = CqServiceProvider.create(this); - initReliableMessageQueueFactory(); + this.rmqFactory = new ReliableMessageQueueFactoryImpl(); // Create the CacheStatistics this.cachePerfStats = new CachePerfStats(system); @@ -1751,58 +1751,59 @@ public class GemFireCacheImpl } } - public synchronized void shutDownAll() { - boolean testIGE = Boolean.getBoolean("TestInternalGemFireError"); + public void shutDownAll() { + synchronized (GemFireCacheImpl.class) { + boolean testIGE = Boolean.getBoolean("TestInternalGemFireError"); - if (testIGE) { - InternalGemFireError assErr = new InternalGemFireError( - LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString()); - throw assErr; - } - if (isCacheAtShutdownAll()) { - // it's already doing shutdown by another thread - return; - } - if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { - try { - CacheObserverHolder.getInstance().beforeShutdownAll(); - } finally { - LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; - } - } - this.isShutDownAll = true; - - // bug 44031 requires multithread shutdownall should be grouped - // by root region. However, shutDownAllDuringRecovery.conf test revealed that - // we have to close colocated child regions first. - // Now check all the PR, if anyone has colocate-with attribute, sort all the - // PRs by colocation relationship and close them sequentially, otherwise still - // group them by root region. - TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees(); - if (prTrees.size() > 1 && shutdownAllPoolSize != 1) { - ExecutorService es = getShutdownAllExecutorService(prTrees.size()); - for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { - es.execute(new Runnable() { - public void run() { - ConnectionTable.threadWantsSharedResources(); - shutdownSubTreeGracefully(prSubMap); - } - }); - } // for each root - es.shutdown(); - try { - es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.debug("Shutdown all interrupted while waiting for PRs to be shutdown gracefully."); + if (testIGE) { + InternalGemFireError assErr = new InternalGemFireError(LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString()); + throw assErr; + } + if (isCacheAtShutdownAll()) { + // it's already doing shutdown by another thread + return; } + if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { + try { + CacheObserverHolder.getInstance().beforeShutdownAll(); + } finally { + LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; + } + } + this.isShutDownAll = true; + + // bug 44031 requires multithread shutdownall should be grouped + // by root region. However, shutDownAllDuringRecovery.conf test revealed that + // we have to close colocated child regions first. + // Now check all the PR, if anyone has colocate-with attribute, sort all the + // PRs by colocation relationship and close them sequentially, otherwise still + // group them by root region. + TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees(); + if (prTrees.size() > 1 && shutdownAllPoolSize != 1) { + ExecutorService es = getShutdownAllExecutorService(prTrees.size()); + for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { + es.execute(new Runnable() { + public void run() { + ConnectionTable.threadWantsSharedResources(); + shutdownSubTreeGracefully(prSubMap); + } + }); + } // for each root + es.shutdown(); + try { + es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.debug("Shutdown all interrupted while waiting for PRs to be shutdown gracefully."); + } - } else { - for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { - shutdownSubTreeGracefully(prSubMap); + } else { + for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { + shutdownSubTreeGracefully(prSubMap); + } } - } - close("Shut down all members", null, false, true); + close("Shut down all members", null, false, true); + } } private ExecutorService getShutdownAllExecutorService(int size) { @@ -4180,17 +4181,15 @@ public class GemFireCacheImpl * regions when this cache requires, or does not require notification of all region/entry events. */ public void addPartitionedRegion(PartitionedRegion r) { - synchronized (GemFireCacheImpl.class) { - synchronized (this.partitionedRegions) { - if (r.isDestroyed()) { - if (logger.isDebugEnabled()) { - logger.debug("GemFireCache#addPartitionedRegion did not add destroyed {}", r); - } - return; - } - if (this.partitionedRegions.add(r)) { - getCachePerfStats().incPartitionedRegions(1); + synchronized (this.partitionedRegions) { + if (r.isDestroyed()) { + if (logger.isDebugEnabled()) { + logger.debug("GemFireCache#addPartitionedRegion did not add destroyed {}", r); } + return; + } + if (this.partitionedRegions.add(r)) { + getCachePerfStats().incPartitionedRegions(1); } } } @@ -4288,22 +4287,20 @@ public class GemFireCacheImpl * @return true if the region should deliver all of its events to this cache */ protected boolean requiresNotificationFromPR(PartitionedRegion r) { - synchronized (GemFireCacheImpl.class) { - boolean hasSerialSenders = hasSerialSenders(r); - boolean result = hasSerialSenders; - if (!result) { - Iterator allCacheServersIterator = allCacheServers.iterator(); - while (allCacheServersIterator.hasNext()) { - CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next(); - if (!server.getNotifyBySubscription()) { - result = true; - break; - } + boolean hasSerialSenders = hasSerialSenders(r); + boolean result = hasSerialSenders; + if (!result) { + Iterator allCacheServersIterator = allCacheServers.iterator(); + while (allCacheServersIterator.hasNext()) { + CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next(); + if (!server.getNotifyBySubscription()) { + result = true; + break; } - } - return result; + } + return result; } private boolean hasSerialSenders(PartitionedRegion r) { @@ -4483,25 +4480,11 @@ public class GemFireCacheImpl /** * This cache's reliable message queue factory. Should always have an instance of it. */ - private ReliableMessageQueueFactory rmqFactory; + private final ReliableMessageQueueFactory rmqFactory; private List<File> backupFiles = Collections.emptyList(); /** - * Initializes the reliable message queue. Needs to be called at cache creation - * - * @throws IllegalStateException if the factory is in use - */ - private void initReliableMessageQueueFactory() { - synchronized (GemFireCacheImpl.class) { - if (this.rmqFactory != null) { - this.rmqFactory.close(false); - } - this.rmqFactory = new ReliableMessageQueueFactoryImpl(); - } - } - - /** * Returns this cache's ReliableMessageQueueFactory. * * @since GemFire 5.0
