Repository: activemq Updated Branches: refs/heads/master 016ae05d0 -> d756d3571
AMQ-6494 Return ExceptionResponse during broker service shutdown Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dce2b61f Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dce2b61f Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dce2b61f Branch: refs/heads/master Commit: dce2b61f870245a0c12b634d12cd0fa3a8a60daa Parents: 016ae05 Author: Hadrian Zbarcea <[email protected]> Authored: Thu Nov 3 02:04:16 2016 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Fri Nov 4 09:30:09 2016 -0400 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 29 ++++++-------------- .../test/resources/handleReplyToActivemq.xml | 9 +++--- 2 files changed, 13 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/dce2b61f/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index a32d4f6..ac72534 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -22,13 +22,11 @@ import java.net.SocketException; import java.net.URI; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -40,13 +38,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.transaction.xa.XAResource; -import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.region.ConnectionStatistics; -import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerSubscriptionInfo; @@ -107,7 +101,6 @@ import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.NetworkBridgeUtils; -import org.apache.activemq.util.SubscriptionKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -196,13 +189,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); } Command command = (Command) o; - if (!brokerService.isStopping()) { - Response response = service(command); - if (response != null && !brokerService.isStopping()) { - dispatchSync(response); - } - } else { - throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); + Response response = service(command); + if (response != null) { + dispatchSync(response); } } finally { serviceLock.readLock().unlock(); @@ -332,10 +321,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor { boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); try { - if (!pendingStop) { + if (brokerService.isStopping()) { + response = responseRequired ? new ExceptionResponse( + new BrokerStoppedException("Broker " + brokerService + " is being stopped")) : null; + } else if (!pendingStop) { response = command.visit(this); } else { - response = new ExceptionResponse(transportException.get()); + response = responseRequired ? new ExceptionResponse(transportException.get()) : null; } } catch (Throwable e) { if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { @@ -465,10 +457,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public int getActiveTransactionCount() { int rc = 0; for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { - Collection<TransactionState> transactions = cs.getTransactionStates(); - for (TransactionState transaction : transactions) { - rc++; - } + rc += cs.getTransactionStates().size(); } return rc; } http://git-wip-us.apache.org/repos/asf/activemq/blob/dce2b61f/activemq-unit-tests/src/test/resources/handleReplyToActivemq.xml ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/handleReplyToActivemq.xml b/activemq-unit-tests/src/test/resources/handleReplyToActivemq.xml index b40cc59..9a5df34 100644 --- a/activemq-unit-tests/src/test/resources/handleReplyToActivemq.xml +++ b/activemq-unit-tests/src/test/resources/handleReplyToActivemq.xml @@ -1,3 +1,4 @@ +<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with @@ -21,18 +22,16 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> - <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/activemq-data/handle-replyto"> + <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/activemq-data/handle-replyto"> <jmsBridgeConnectors> <jmsQueueConnector> <inboundQueueBridges> - <inboundQueueBridge inboundQueueName="QueueA" localQueueName = "localTestQ" -doHandleReplyTo="false"/> + <inboundQueueBridge inboundQueueName="QueueA" localQueueName="localTestQ" doHandleReplyTo="false" /> </inboundQueueBridges> </jmsQueueConnector> </jmsBridgeConnectors> - </broker> + </broker> </beans> -<!-- END SNIPPET: example -->
