This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-5772 in repository https://gitbox.apache.org/repos/asf/geode.git
commit d3bb8d848732428cbfa420cfd82a0b345e09599b Author: zhouxh <[email protected]> AuthorDate: Fri Sep 21 15:33:21 2018 -0700 GEODE-5772: fix the potential cache leaks caused by reconnect Co-authored-by Barry Oglesby --- .../apache/geode/distributed/ServerLauncher.java | 4 +++ .../internal/ClusterDistributionManager.java | 5 +++ .../distributed/internal/DistributionManager.java | 2 ++ .../internal/InternalDistributedSystem.java | 10 ++++++ .../internal/LonerDistributionManager.java | 2 ++ .../membership/gms/mgr/GMSMembershipManager.java | 2 +- .../apache/geode/internal/cache/DiskStoreImpl.java | 3 +- .../geode/internal/cache/GemFireCacheImpl.java | 1 + .../apache/geode/internal/cache/TXManagerImpl.java | 4 ++- .../geode/internal/cache/TombstoneService.java | 23 ++++++++----- .../internal/cache/wan/AbstractGatewaySender.java | 5 +++ .../wan/parallel/ParallelGatewaySenderQueue.java | 38 ++++++++++------------ .../cache/wan/serial/SerialGatewaySenderQueue.java | 14 ++++---- .../cache/xmlcache/CacheServerCreation.java | 4 +-- 14 files changed, 75 insertions(+), 42 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java index 9b7be48..6c2fe5d 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java @@ -347,6 +347,10 @@ public class ServerLauncher extends AbstractLauncher<String> { return this.cache; } + public void setCache(Cache cache) { + this.cache = cache; + } + /** * Gets the CacheConfig object used to configure additional GemFire Cache components and features * (e.g. PDX). diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java index 6b503c7..966cdc9 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java @@ -2111,6 +2111,10 @@ public class ClusterDistributionManager implements DistributionManager { this.membershipListeners.remove(l); } + public void clearMembershipListeners() { + this.membershipListeners.clear(); + } + /** * Adds a <code>MembershipListener</code> to this distribution manager. */ @@ -2280,6 +2284,7 @@ public class ClusterDistributionManager implements DistributionManager { this.localAddress)); MembershipLogger.logShutdown(this.localAddress); closed = true; + this.cache = null; } } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java index 9742822..f9e7e6f 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java @@ -198,6 +198,8 @@ public interface DistributionManager extends ReplySender { */ void removeMembershipListener(MembershipListener l); + void clearMembershipListeners(); + /** * Removes a <code>MembershipListener</code> listening for all members from this distribution * manager. diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java index 9d992a1..5d5fe8d 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java @@ -67,6 +67,7 @@ import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.DistributedSystemDisconnectedException; import org.apache.geode.distributed.DurableClientAttributes; +import org.apache.geode.distributed.ServerLauncher; import org.apache.geode.distributed.internal.locks.GrantorRequestProcessor; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.distributed.internal.membership.MembershipManager; @@ -2693,6 +2694,11 @@ public class InternalDistributedSystem extends DistributedSystem logger.warn("Exception disconnecting for reconnect", ee); } + if (ServerLauncher.getInstance() != null) { + ServerLauncher.getInstance().setCache(null); + } + getDM().clearMembershipListeners(); + try { reconnectLock.wait(timeOut); } catch (InterruptedException e) { @@ -2798,6 +2804,10 @@ public class InternalDistributedSystem extends DistributedSystem } cache = GemFireCacheImpl.create(this.reconnectDS, config); + if (ServerLauncher.getInstance() != null) { + ServerLauncher.getInstance().setCache(cache); + } + createAndStartCacheServers(cacheServerCreation, cache); if (cache.getCachePerfStats().getReliableRegionsMissing() == 0) { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java index 6115f7c..f265def 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java @@ -251,6 +251,8 @@ public class LonerDistributionManager implements DistributionManager { public void removeMembershipListener(MembershipListener l) {} + public void clearMembershipListeners() {} + public void removeAllMembershipListener(MembershipListener l) {} public void addAdminConsole(InternalDistributedMember p_id) {} diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index cb19969..c015cb4 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -1596,7 +1596,7 @@ public class GMSMembershipManager implements MembershipManager, Manager { for (final Object o : cache.getCacheServers()) { CacheServerImpl cs = (CacheServerImpl) o; if (cs.isDefaultServer()) { - CacheServerCreation bsc = new CacheServerCreation(cache, cs); + CacheServerCreation bsc = new CacheServerCreation(cache, cs, false); list.add(bsc); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java index d174157..dfb65a2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java @@ -263,7 +263,7 @@ public class DiskStoreImpl implements DiskStore { public static volatile HashSet<String> TEST_CHK_FALLOC_DIRS; public static volatile HashSet<String> TEST_NO_FALLOC_DIRS; - private final InternalCache cache; + private InternalCache cache; /** The stats for this store */ private final DiskStoreStats stats; @@ -2368,6 +2368,7 @@ public class DiskStoreImpl implements DiskStore { } } finally { this.closed = true; + this.cache = null; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index 467feee..7beecdf 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -2202,6 +2202,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has } } catch (CancelException ignore) { } + ((AbstractGatewaySender) sender).resetCache(); } destroyGatewaySenderLockService(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java index b0ec44d..e2aa0f2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java @@ -92,7 +92,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene private final AtomicInteger uniqId; private final DistributionManager dm; - private final InternalCache cache; + private InternalCache cache; // The DistributionMemberID used to construct TXId's private final InternalDistributedMember distributionMgrId; @@ -649,6 +649,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene if (isClosed()) { return; } + this.cache = null; + this.currentInstance = null; TXStateProxy[] proxies = null; synchronized (this.hostedTXStates) { // After this, newly added TXStateProxy would not operate on the TXState. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java index 14c1d53..fa90fcf 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java @@ -570,6 +570,10 @@ public class TombstoneService { try { // this thread should not reference other sweeper state, which is not synchronized for (Map.Entry<DistributedRegion, Set<Object>> mapEntry : reapedKeys.entrySet()) { + if (isStopped) { + logger.info("expireBatch is stopped due to close"); + break; + } DistributedRegion r = mapEntry.getKey(); Set<Object> rKeysReaped = mapEntry.getValue(); r.distributeTombstoneGC(rKeysReaped); @@ -775,7 +779,7 @@ public class TombstoneService { protected final CachePerfStats stats; private final CancelCriterion cancelCriterion; - private volatile boolean isStopped; + protected volatile boolean isStopped; TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, long expiryTime, String threadName) { @@ -923,13 +927,16 @@ public class TombstoneService { if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) { logger.trace(LogMarker.TOMBSTONE_VERBOSE, "sleeping for {}", sleepTime); } - synchronized (this) { - if (isStopped) { - return; - } - try { - this.wait(sleepTime); - } catch (InterruptedException ignore) { + long then = getNow(); + while ((getNow() - then) <= sleepTime) { + synchronized (this) { + if (isStopped) { + return; + } + try { + this.wait(500); + } catch (InterruptedException ignore) { + } } } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index b099eb1..2eb7abe 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -278,6 +278,11 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di return senderAdvisor; } + public void resetCache() { + logger.info("Cache reference is reset for GatewaySender " + this.getId()); + this.cache = null; + } + @Override public GatewaySenderStats getStatistics() { return statistics; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 057697a..d3cbd43 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -288,7 +288,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // still, it is safer approach to synchronize it synchronized (ParallelGatewaySenderQueue.class) { if (removalThread == null) { - removalThread = new BatchRemovalThread(this.sender.getCache(), this); + removalThread = new BatchRemovalThread(this); removalThread.start(); } } @@ -310,9 +310,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { if (this.userRegionNameToshadowPRMap.containsKey(regionName)) return; - InternalCache cache = sender.getCache(); final String prQName = getQueueName(sender.getId(), userRegion.getFullPath()); - prQ = (PartitionedRegion) cache.getRegion(prQName); + prQ = (PartitionedRegion) sender.getCache().getRegion(prQName); if (prQ == null) { // TODO:REF:Avoid deprecated apis AttributesFactory fact = new AttributesFactory(); @@ -356,10 +355,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } ParallelGatewaySenderQueueMetaRegion meta = - new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, cache, sender); + new ParallelGatewaySenderQueueMetaRegion(prQName, ra, null, sender.getCache(), sender); try { - prQ = (PartitionedRegion) cache.createVMRegion(prQName, ra, + prQ = (PartitionedRegion) sender.getCache().createVMRegion(prQName, ra, new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true) .setSnapshotInputStream(null).setImageTarget(null)); @@ -399,7 +398,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // in case shadowPR exists already (can be possible when sender is // started from stop operation) if (this.index == 0) // HItesh: for first processor only - handleShadowPRExistsScenario(cache, prQ); + handleShadowPRExistsScenario(sender.getCache(), prQ); } /* * Here, enqueueTempEvents need to be invoked when a sender is already running and userPR is @@ -455,11 +454,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { .toLocalizedString(new Object[] {this.sender.getId(), userPR.getFullPath()})); } - InternalCache cache = sender.getCache(); boolean isAccessor = (userPR.getLocalMaxMemory() == 0); final String prQName = sender.getId() + QSTRING + convertPathToName(userPR.getFullPath()); - prQ = (PartitionedRegion) cache.getRegion(prQName); + prQ = (PartitionedRegion) sender.getCache().getRegion(prQName); if (prQ == null) { // TODO:REF:Avoid deprecated apis @@ -506,10 +504,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } ParallelGatewaySenderQueueMetaRegion meta = - metaRegionFactory.newMetataRegion(cache, prQName, ra, sender); + metaRegionFactory.newMetataRegion(sender.getCache(), prQName, ra, sender); try { - prQ = (PartitionedRegion) cache.createVMRegion(prQName, ra, + prQ = (PartitionedRegion) sender.getCache().createVMRegion(prQName, ra, new InternalRegionArguments().setInternalMetaRegion(meta).setDestroyLockFlag(true) .setInternalRegion(true).setSnapshotInputStream(null).setImageTarget(null)); // at this point we should be able to assert prQ == meta; @@ -520,7 +518,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return; // return from here if accessor node // Add the overflow statistics to the mbean - addOverflowStatisticsToMBean(cache, prQ); + addOverflowStatisticsToMBean(sender.getCache(), prQ); // Wait for buckets to be recovered. prQ.shadowPRWaitForBucketRecovery(); @@ -539,7 +537,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { // in case shadowPR exists already (can be possible when sender is // started from stop operation) if (this.index == 0) // HItesh:for first parallelGatewaySenderQueue only - handleShadowPRExistsScenario(cache, prQ); + handleShadowPRExistsScenario(sender.getCache(), prQ); } } finally { @@ -1014,8 +1012,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } else { String regionPath = event.getRegionPath(); - InternalCache cache = this.sender.getCache(); - Region region = (PartitionedRegion) cache.getRegion(regionPath); + Region region = (PartitionedRegion) sender.getCache().getRegion(regionPath); if (region != null && !region.isDestroyed()) { // TODO: We have to get colocated parent region for this region if (region instanceof DistributedRegion) { @@ -1600,17 +1597,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue { */ private volatile boolean shutdown = false; - private final InternalCache cache; - private final ParallelGatewaySenderQueue parallelQueue; /** * Constructor : Creates and initializes the thread */ - public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) { + public BatchRemovalThread(ParallelGatewaySenderQueue queue) { super("BatchRemovalThread for GatewaySender_" + queue.sender.getId() + "_" + queue.index); this.setDaemon(true); - this.cache = c; this.parallelQueue = queue; } @@ -1618,7 +1612,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { if (shutdown) { return true; } - if (cache.getCancelCriterion().isCancelInProgress()) { + if (parallelQueue.sender.getCache().getCancelCriterion().isCancelInProgress()) { return true; } return false; @@ -1627,7 +1621,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { @Override public void run() { try { - InternalDistributedSystem ids = cache.getInternalDistributedSystem(); + InternalDistributedSystem ids = + parallelQueue.sender.getCache().getInternalDistributedSystem(); DistributionManager dm = ids.getDistributionManager(); for (;;) { try { // be somewhat tolerant of failures @@ -1684,7 +1679,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { buckToDispatchLock.unlock(); } // Get all the data-stores wherever userPRs are present - Set<InternalDistributedMember> recipients = getAllRecipients(cache, temp); + Set<InternalDistributedMember> recipients = + getAllRecipients(parallelQueue.sender.getCache(), temp); if (!recipients.isEmpty()) { ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp); pqrm.setRecipients(recipients); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 6cfe7f4..357d992 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -213,7 +213,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { initializeRegion(abstractSender, listener); // Increment queue size. Fix for bug 51988. this.stats.incQueueSize(this.region.size()); - this.removalThread = new BatchRemovalThread(abstractSender.getCache()); + this.removalThread = new BatchRemovalThread(); this.removalThread.start(); this.sender = abstractSender; if (logger.isDebugEnabled()) { @@ -1042,22 +1042,19 @@ public class SerialGatewaySenderQueue implements RegionQueue { */ private volatile boolean shutdown = false; - private final InternalCache cache; - /** * Constructor : Creates and initializes the thread * */ - public BatchRemovalThread(InternalCache c) { + public BatchRemovalThread() { this.setDaemon(true); - this.cache = c; } private boolean checkCancelled() { if (shutdown) { return true; } - if (cache.getCancelCriterion().isCancelInProgress()) { + if (sender.getCache().getCancelCriterion().isCancelInProgress()) { return true; } return false; @@ -1065,7 +1062,7 @@ public class SerialGatewaySenderQueue implements RegionQueue { @Override public void run() { - InternalDistributedSystem ids = cache.getInternalDistributedSystem(); + InternalDistributedSystem ids = sender.getCache().getInternalDistributedSystem(); try { // ensure exit message is printed // Long waitTime = Long.getLong(QUEUE_REMOVAL_WAIT_TIME, 1000); @@ -1113,7 +1110,8 @@ public class SerialGatewaySenderQueue implements RegionQueue { } // release not needed since disallowOffHeapValues called EntryEventImpl event = EntryEventImpl.create((LocalRegion) region, Operation.DESTROY, - (lastDestroyedKey + 1), null/* newValue */, null, false, cache.getMyId()); + (lastDestroyedKey + 1), null/* newValue */, null, false, + sender.getCache().getMyId()); event.disallowOffHeapValues(); event.setTailKey(temp); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java index 07c2685..a9f2e46 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java @@ -54,8 +54,8 @@ public class CacheServerCreation extends AbstractCacheServer { * Constructor for retaining bridge server information during auto-reconnect * */ - public CacheServerCreation(InternalCache cache, CacheServer other) { - super(cache); + public CacheServerCreation(InternalCache cache, CacheServer other, boolean attachListener) { + super(cache, attachListener); setPort(other.getPort()); setBindAddress(other.getBindAddress()); setHostnameForClients(other.getHostnameForClients());
