[GEODE-2324] fixes to AcceptorImpl.close() If the thread is interrupted during closing, just continue to shut down what we can.
* Catch InterruptedException so cleanup continues. * Remove top-level exception handler to avoid masking exceptions that * could short-circuit shutdown. * Fix a synchronization bug that could cause AcceptorImpl to try to shut down twice. * Fix what looks like a bug where if closing the socket throws an IOException, we fail to shut anything else down, though we still have ourselves marked as shut down. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/75b02565 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/75b02565 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/75b02565 Branch: refs/heads/develop Commit: 75b02565b272a4b1dc5763f768bfed26fbed3d18 Parents: 30341ec Author: Galen O'Sullivan <gosulli...@pivotal.io> Authored: Wed Jan 18 15:00:47 2017 -0800 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Thu Feb 9 14:28:48 2017 -0800 ---------------------------------------------------------------------- .../cache/tier/sockets/AcceptorImpl.java | 133 +++++++++---------- 1 file changed, 65 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/75b02565/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 060683d..ea6b369 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -1527,89 +1527,86 @@ public class AcceptorImpl extends Acceptor implements Runnable { } @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "REC_CATCH_EXCEPTION", - justification = "Allow this thread to die") public void close() { - if (!isRunning()) { - return; - } - - try { - synchronized (syncLock) { - this.shutdown = true; - logger.info(LocalizedMessage.create( - LocalizedStrings.AcceptorImpl_CACHE_SERVER_ON_PORT_0_IS_SHUTTING_DOWN, this.localPort)); - if (this.thread != null) { - this.thread.interrupt(); - } + synchronized (syncLock) { + if (!isRunning()) { + return; + } + this.shutdown = true; + logger.info(LocalizedMessage.create( + LocalizedStrings.AcceptorImpl_CACHE_SERVER_ON_PORT_0_IS_SHUTTING_DOWN, this.localPort)); + if (this.thread != null) { + this.thread.interrupt(); + } + try { this.serverSock.close(); - crHelper.setShutdown(true); // set this before shutting down the pool - if (isSelector()) { - this.hsTimer.cancel(); - if (this.tmpSel != null) { - try { - this.tmpSel.close(); - } catch (IOException ignore) { - } - } + } catch (IOException ignore) { + // Well, we tried. Continue shutting down. + } + crHelper.setShutdown(true); // set this before shutting down the pool + if (isSelector()) { + this.hsTimer.cancel(); + if (this.tmpSel != null) { try { - wakeupSelector(); - this.selector.close(); + this.tmpSel.close(); } catch (IOException ignore) { } - if (this.selectorThread != null) { - this.selectorThread.interrupt(); - } - this.commBufferQueue.clear(); } - ClientHealthMonitor.shutdownInstance(); - shutdownSCs(); - this.clientNotifier.shutdown(this.acceptorId); - this.pool.shutdown(); + try { + wakeupSelector(); + this.selector.close(); + } catch (IOException ignore) { + } + if (this.selectorThread != null) { + this.selectorThread.interrupt(); + } + this.commBufferQueue.clear(); + } + ClientHealthMonitor.shutdownInstance(); + shutdownSCs(); + this.clientNotifier.shutdown(this.acceptorId); + this.pool.shutdown(); + + try { if (!this.pool.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) { logger.warn(LocalizedMessage .create(LocalizedStrings.PoolImpl_TIMEOUT_WAITING_FOR_BACKGROUND_TASKS_TO_COMPLETE)); this.pool.shutdownNow(); } - this.hsPool.shutdownNow(); - this.stats.close(); - GemFireCacheImpl myCache = (GemFireCacheImpl) cache; - if (!myCache.forcedDisconnect()) { - Set<PartitionedRegion> prs = myCache.getPartitionedRegions(); - for (PartitionedRegion pr : prs) { - Map<Integer, BucketAdvisor.BucketProfile> profiles = - new HashMap<Integer, BucketAdvisor.BucketProfile>(); - // get all local real bucket advisors - Map<Integer, BucketAdvisor> advisors = pr.getRegionAdvisor().getAllBucketAdvisors(); - for (Map.Entry<Integer, BucketAdvisor> entry : advisors.entrySet()) { - BucketAdvisor advisor = entry.getValue(); - BucketProfile bp = (BucketProfile) advisor.createProfile(); - advisor.updateServerBucketProfile(bp); - profiles.put(entry.getKey(), bp); - } - Set receipients = new HashSet(); - receipients = pr.getRegionAdvisor().adviseAllPRNodes(); - // send it to all in one messgae - ReplyProcessor21 reply = AllBucketProfilesUpdateMessage.send(receipients, - pr.getDistributionManager(), pr.getPRId(), profiles, true); - if (reply != null) { - reply.waitForRepliesUninterruptibly(); - } + } catch (InterruptedException ignore) { + this.pool.shutdownNow(); + } + this.hsPool.shutdownNow(); + this.stats.close(); + GemFireCacheImpl myCache = (GemFireCacheImpl) cache; + if (!myCache.forcedDisconnect()) { + Set<PartitionedRegion> prs = myCache.getPartitionedRegions(); + for (PartitionedRegion pr : prs) { + Map<Integer, BucketAdvisor.BucketProfile> profiles = + new HashMap<Integer, BucketAdvisor.BucketProfile>(); + // get all local real bucket advisors + Map<Integer, BucketAdvisor> advisors = pr.getRegionAdvisor().getAllBucketAdvisors(); + for (Map.Entry<Integer, BucketAdvisor> entry : advisors.entrySet()) { + BucketAdvisor advisor = entry.getValue(); + BucketProfile bp = (BucketProfile) advisor.createProfile(); + advisor.updateServerBucketProfile(bp); + profiles.put(entry.getKey(), bp); + } + Set receipients = new HashSet(); + receipients = pr.getRegionAdvisor().adviseAllPRNodes(); + // send it to all in one messgae + ReplyProcessor21 reply = AllBucketProfilesUpdateMessage.send(receipients, + pr.getDistributionManager(), pr.getPRId(), profiles, true); + if (reply != null) { + reply.waitForRepliesUninterruptibly(); + } - if (logger.isDebugEnabled()) { - logger.debug("sending messages to all peers for removing this server.."); - } + if (logger.isDebugEnabled()) { + logger.debug("sending messages to all peers for removing this server.."); } } - - } // synchronized - } catch (InterruptedException e) { - if (!this.shutdown) { // GEODE-1103: expect an interrupt during shutdown - logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED), e); } - } catch (Exception e) {/* ignore */ - logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED), e); - } + } // synchronized } private void shutdownSCs() {