Repository: activemq-artemis Updated Branches: refs/heads/2.6.x 6b50d76e4 -> 4dbe0ecbb
[ARTEMIS-2105] Discovery group connectors can delay broker shutdown Issue: https://issues.apache.org/jira/browse/ARTEMIS-2105 (cherry picked from commit 2450f6a3769e83b204ec72354ed5fabb5053d3a1) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4dbe0ecb Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4dbe0ecb Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4dbe0ecb Branch: refs/heads/2.6.x Commit: 4dbe0ecbb99bbf3aad9bdbef5032faad1aa418d2 Parents: 6b50d76 Author: Ingo Weiss <[email protected]> Authored: Tue Oct 2 15:43:52 2018 +0100 Committer: Clebert Suconic <[email protected]> Committed: Thu Oct 11 15:10:39 2018 -0400 ---------------------------------------------------------------------- .../artemis/ra/inflow/ActiveMQActivation.java | 139 ++++++++++--------- 1 file changed, 72 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4dbe0ecb/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index b0f0aff..57bf5c4 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -123,6 +123,8 @@ public class ActiveMQActivation { private boolean lastReceived = false; + private final Object teardownLock = new Object(); + // Whether we are in the failure recovery loop private final AtomicBoolean inReconnect = new AtomicBoolean(false); private XARecoveryConfig resourceRecovery; @@ -352,98 +354,102 @@ public class ActiveMQActivation { /** * Teardown the activation */ - protected synchronized void teardown(boolean useInterrupt) { - logger.debug("Tearing down " + spec); + protected void teardown(boolean useInterrupt) { - long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout(); + synchronized (teardownLock) { - if (resourceRecovery != null) { - ra.getRecoveryManager().unRegister(resourceRecovery); - } + logger.debug("Tearing down " + spec); - final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()]; + long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout(); - // We need to do from last to first as any temporary queue will have been created on the first handler - // So we invert the handlers here - for (int i = 0; i < handlers.size(); i++) { - // The index here is the complimentary so it's inverting the array - handlersCopy[i] = handlers.get(handlers.size() - i - 1); - } + if (resourceRecovery != null) { + ra.getRecoveryManager().unRegister(resourceRecovery); + } - handlers.clear(); + final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()]; - FutureLatch future = new FutureLatch(handlersCopy.length); - for (ActiveMQMessageHandler handler : handlersCopy) { - handler.interruptConsumer(future); - } + // We need to do from last to first as any temporary queue will have been created on the first handler + // So we invert the handlers here + for (int i = 0; i < handlers.size(); i++) { + // The index here is the complimentary so it's inverting the array + handlersCopy[i] = handlers.get(handlers.size() - i - 1); + } + + handlers.clear(); - //wait for all the consumers to complete any onmessage calls - boolean stuckThreads = !future.await(timeout); - //if any are stuck then we need to interrupt them - if (stuckThreads && useInterrupt) { + FutureLatch future = new FutureLatch(handlersCopy.length); for (ActiveMQMessageHandler handler : handlersCopy) { - Thread interruptThread = handler.getCurrentThread(); - if (interruptThread != null) { - try { - logger.tracef("Interrupting thread %s", interruptThread.getName()); - } catch (Throwable justLog) { - logger.warn(justLog); - } - try { - interruptThread.interrupt(); - } catch (Throwable e) { - //ok - } - } + handler.interruptConsumer(future); } - } - Runnable runTearDown = new Runnable() { - @Override - public void run() { + //wait for all the consumers to complete any onmessage calls + boolean stuckThreads = !future.await(timeout); + //if any are stuck then we need to interrupt them + if (stuckThreads && useInterrupt) { for (ActiveMQMessageHandler handler : handlersCopy) { - handler.teardown(); + Thread interruptThread = handler.getCurrentThread(); + if (interruptThread != null) { + try { + logger.tracef("Interrupting thread %s", interruptThread.getName()); + } catch (Throwable justLog) { + logger.warn(justLog); + } + try { + interruptThread.interrupt(); + } catch (Throwable e) { + //ok + } + } } } - }; - Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown); + Runnable runTearDown = new Runnable() { + @Override + public void run() { + for (ActiveMQMessageHandler handler : handlersCopy) { + handler.teardown(); + } + } + }; - try { - threadTearDown.join(timeout); - } catch (InterruptedException e) { - // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up - } + Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown); - if (factory != null) { try { - // closing the factory will help making sure pending threads are closed - factory.close(); - } catch (Throwable e) { - ActiveMQRALogger.LOGGER.unableToCloseFactory(e); + threadTearDown.join(timeout); + } catch (InterruptedException e) { + // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up } - factory = null; - } - - if (threadTearDown.isAlive()) { - threadTearDown.interrupt(); + if (factory != null) { + try { + // closing the factory will help making sure pending threads are closed + factory.close(); + } catch (Throwable e) { + ActiveMQRALogger.LOGGER.unableToCloseFactory(e); + } - try { - threadTearDown.join(5000); - } catch (InterruptedException e) { - // nothing to be done here.. we are going down anyways + factory = null; } if (threadTearDown.isAlive()) { - ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString()); + threadTearDown.interrupt(); + + try { + threadTearDown.join(5000); + } catch (InterruptedException e) { + // nothing to be done here.. we are going down anyways + } + + if (threadTearDown.isAlive()) { + ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString()); + } } - } - nodes.clear(); - lastReceived = false; + nodes.clear(); + lastReceived = false; - logger.debug("Tearing down complete " + this); + logger.debug("Tearing down complete " + this); + } } protected void setupCF() throws Exception { @@ -467,7 +473,6 @@ public class ActiveMQActivation { } else { factory = ra.newConnectionFactory(spec); } - } /**
