Repository: activemq-artemis Updated Branches: refs/heads/master 22b62b5b0 -> e549a153a
ARTEMIS-2030 only use interrupt during shutdown on RA Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/99d091a0 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/99d091a0 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/99d091a0 Branch: refs/heads/master Commit: 99d091a0eaa07f042ce5d689bff9a3bd5880f1b3 Parents: 22b62b5 Author: Clebert Suconic <[email protected]> Authored: Mon Aug 13 20:30:54 2018 -0400 Committer: Clebert Suconic <[email protected]> Committed: Mon Aug 13 20:40:34 2018 -0400 ---------------------------------------------------------------------- .../core/client/impl/ClientConsumerImpl.java | 9 ++ .../client/impl/ClientConsumerInternal.java | 2 + .../artemis/ra/inflow/ActiveMQActivation.java | 113 +++++++++++++------ .../ra/inflow/ActiveMQMessageHandler.java | 16 ++- .../client/impl/LargeMessageBufferTest.java | 5 + 5 files changed, 105 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/99d091a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index 19987ff..5e3d95e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -409,6 +409,15 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { return handler; } + @Override + public Thread getCurrentThread() { + if (onMessageThread != null) { + return onMessageThread; + } + return receiverThread; + } + + // Must be synchronized since messages may be arriving while handler is being set and might otherwise end // up not queueing enough executors - so messages get stranded @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/99d091a0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java index 177732e..986f397 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java @@ -41,6 +41,8 @@ public interface ClientConsumerInternal extends ClientConsumer { void clear(boolean waitForOnMessage) throws ActiveMQException; + Thread getCurrentThread(); + /** * To be called by things like MDBs during shutdown of the server * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/99d091a0/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index 204d5d0..9d4b096 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -26,9 +26,12 @@ import javax.naming.InitialContext; import javax.resource.ResourceException; import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.resource.spi.work.Work; +import javax.resource.spi.work.WorkException; import javax.resource.spi.work.WorkManager; import javax.transaction.xa.XAResource; import java.lang.reflect.Method; +import java.security.AccessController; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -56,6 +59,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRALogger; import org.apache.activemq.artemis.ra.ActiveMQRaUtils; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.PasswordMaskingUtil; import org.jboss.logging.Logger; @@ -242,7 +246,7 @@ public class ActiveMQActivation { logger.trace("start()"); } deliveryActive.set(true); - ra.getWorkManager().scheduleWork(new SetupActivation()); + scheduleWork(new SetupActivation()); } /** @@ -282,7 +286,7 @@ public class ActiveMQActivation { } deliveryActive.set(false); - teardown(); + teardown(true); } /** @@ -348,7 +352,7 @@ public class ActiveMQActivation { /** * Teardown the activation */ - protected synchronized void teardown() { + protected synchronized void teardown(boolean useInterrupt) { logger.debug("Tearing down " + spec); long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout(); @@ -369,28 +373,27 @@ public class ActiveMQActivation { handlers.clear(); FutureLatch future = new FutureLatch(handlersCopy.length); - List<Thread> interruptThreads = new ArrayList<>(); for (ActiveMQMessageHandler handler : handlersCopy) { - Thread thread = handler.interruptConsumer(future); - if (thread != null) { - interruptThreads.add(thread); - } + handler.interruptConsumer(future); } //wait for all the consumers to complete any onmessage calls boolean stuckThreads = !future.await(timeout); //if any are stuck then we need to interrupt them - if (stuckThreads) { - for (Thread interruptThread : interruptThreads) { - try { - interruptThread.interrupt(); - } catch (Exception e) { - //ok + if (stuckThreads && useInterrupt) { + for (ActiveMQMessageHandler handler : handlersCopy) { + Thread interruptThread = handler.getCurrentThread(); + if (interruptThread != null) { + try { + interruptThread.interrupt(); + } catch (Throwable e) { + //ok + } } } } - Thread threadTearDown = new Thread("TearDown/ActiveMQActivation") { + Runnable runTearDown = new Runnable() { @Override public void run() { for (ActiveMQMessageHandler handler : handlersCopy) { @@ -399,10 +402,7 @@ public class ActiveMQActivation { } }; - // We will first start a new thread that will call tearDown on all the instances, trying to graciously shutdown everything. - // We will then use the call-timeout to determine a timeout. - // if that failed we will then close the connection factory, and interrupt the thread - threadTearDown.start(); + Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown); try { threadTearDown.join(timeout); @@ -550,9 +550,7 @@ public class ActiveMQActivation { calculatedDestinationName = spec.getQueuePrefix() + calculatedDestinationName; } - logger.debug("Unable to retrieve " + destinationName + - " from JNDI. Creating a new " + destinationType.getName() + - " named " + calculatedDestinationName + " to be used by the MDB."); + logger.debug("Unable to retrieve " + destinationName + " from JNDI. Creating a new " + destinationType.getName() + " named " + calculatedDestinationName + " to be used by the MDB."); // If there is no binding on naming, we will just create a new instance if (isTopic) { @@ -602,18 +600,41 @@ public class ActiveMQActivation { return buffer.toString(); } - public void startReconnectThread(final String threadName) { + public void startReconnectThread(final String cause) { if (logger.isTraceEnabled()) { - logger.trace("Starting reconnect Thread " + threadName + " on MDB activation " + this); + logger.trace("Starting reconnect Thread " + cause + " on MDB activation " + this); } - Runnable runnable = new Runnable() { - @Override - public void run() { - reconnect(null); - } - }; - Thread t = new Thread(runnable, threadName); + try { + // We have to use the worker otherwise we may get the wrong classLoader + scheduleWork(new ReconnectWork(cause)); + } catch (Exception e) { + logger.warn("Could not reconnect because worker is down", e); + } + } + + private static Thread startThread(String name, Runnable run) { + ClassLoader tccl; + + try { + tccl = AccessController.doPrivileged(new PrivilegedExceptionAction<ClassLoader>() { + @Override + public ClassLoader run() { + return ActiveMQActivation.class.getClassLoader(); + } + }); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + tccl = null; + } + + ActiveMQThreadFactory factory = new ActiveMQThreadFactory(name, true, tccl); + Thread t = factory.newThread(run); t.start(); + return t; + } + + private void scheduleWork(Work run) throws WorkException { + ra.getWorkManager().scheduleWork(run); } /** @@ -621,7 +642,7 @@ public class ActiveMQActivation { * * @param failure if reconnecting in the event of a failure */ - public void reconnect(Throwable failure) { + public void reconnect(Throwable failure, boolean useInterrupt) { if (logger.isTraceEnabled()) { logger.trace("reconnecting activation " + this); } @@ -644,7 +665,7 @@ public class ActiveMQActivation { try { Throwable lastException = failure; while (deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) { - teardown(); + teardown(useInterrupt); try { Thread.sleep(setupInterval); @@ -697,7 +718,7 @@ public class ActiveMQActivation { try { setup(); } catch (Throwable t) { - reconnect(t); + reconnect(t, false); } } @@ -706,6 +727,30 @@ public class ActiveMQActivation { } } + /** + * Handles reconnecting + */ + private class ReconnectWork implements Work { + + final String cause; + + ReconnectWork(String cause) { + this.cause = cause; + } + + @Override + public void release() { + + } + + @Override + public void run() { + logger.tracef("Starting reconnect for %s", cause); + reconnect(null, false); + } + + } + private class RebalancingListener implements ClusterTopologyListener { @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/99d091a0/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index f343ec9..0183896 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -124,17 +124,13 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList if (!spec.isShareSubscriptions()) { throw ActiveMQRALogger.LOGGER.canNotCreatedNonSharedSubscriber(); } else if (ActiveMQRALogger.LOGGER.isDebugEnabled()) { - logger.debug("the mdb on destination " + queueName + " already had " + - subResponse.getConsumerCount() + - " consumers but the MDB is configured to share subscriptions, so no exceptions are thrown"); + logger.debug("the mdb on destination " + queueName + " already had " + subResponse.getConsumerCount() + " consumers but the MDB is configured to share subscriptions, so no exceptions are thrown"); } } SimpleString oldFilterString = subResponse.getFilterString(); - boolean selectorChanged = selector == null && oldFilterString != null || - oldFilterString == null && selector != null || - (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector)); + boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector)); SimpleString oldTopicName = subResponse.getAddress(); @@ -198,6 +194,14 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList return useXA ? session : null; } + public Thread getCurrentThread() { + if (consumer == null) { + return null; + } + + return consumer.getCurrentThread(); + } + public Thread interruptConsumer(FutureLatch future) { try { if (consumer != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/99d091a0/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java index d78f070..c1790d0 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java @@ -676,6 +676,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase { } @Override + public Thread getCurrentThread() { + return null; + } + + @Override public ClientMessage receive(final long timeout) throws ActiveMQException { return null; }
