https://issues.apache.org/jira/browse/AMQ-5262
close connections when the connector is stopped. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/73cb029c Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/73cb029c Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/73cb029c Branch: refs/heads/activemq-5.10.x Commit: 73cb029c1b2f8c1f113b26e76ff5af9e98007726 Parents: 8f43810 Author: Timothy Bish <[email protected]> Authored: Tue Jul 8 16:20:21 2014 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Mon Dec 15 19:08:06 2014 -0500 ---------------------------------------------------------------------- .../activemq/network/jms/JmsConnector.java | 31 +++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/73cb029c/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java index afeb88a..6ddc6c5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.QueueConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.Service; @@ -73,20 +72,21 @@ public abstract class JmsConnector implements Service { private ReconnectionPolicy policy = new ReconnectionPolicy(); protected ThreadPoolExecutor connectionSerivce; - private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); - private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); + private final List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); + private final List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); private String name; private static LRUCache<Destination, DestinationBridge> createLRUCache() { return new LRUCache<Destination, DestinationBridge>() { private static final long serialVersionUID = -7446792754185879286L; + @Override protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) { if (size() > maxCacheSize) { Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator(); Map.Entry<Destination, DestinationBridge> lru = iter.next(); remove(lru.getKey()); - DestinationBridge bridge = (DestinationBridge)lru.getValue(); + DestinationBridge bridge = lru.getValue(); try { bridge.stop(); LOG.info("Expired bridge: {}", bridge); @@ -151,6 +151,7 @@ public abstract class JmsConnector implements Service { return true; } + @Override public void start() throws Exception { if (started.compareAndSet(false, true)) { init(); @@ -164,12 +165,27 @@ public abstract class JmsConnector implements Service { } } + @Override public void stop() throws Exception { if (started.compareAndSet(true, false)) { ThreadPoolUtils.shutdown(connectionSerivce); connectionSerivce = null; + if (foreignConnection.get() != null) { + try { + foreignConnection.get().close(); + } catch (Exception e) { + } + } + + if (localConnection.get() != null) { + try { + localConnection.get().close(); + } catch (Exception e) { + } + } + for (DestinationBridge bridge : inboundBridges) { bridge.stop(); } @@ -480,7 +496,7 @@ public abstract class JmsConnector implements Service { // TODO - How do we handle the re-wiring of replyToBridges in this case. replyToBridges.clear(); - if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) { + if (this.foreignConnection.compareAndSet(connection, null)) { // Stop the inbound bridges when the foreign connection is dropped since // the bridge has no consumer and needs to be restarted once a new connection @@ -505,7 +521,7 @@ public abstract class JmsConnector implements Service { } }); - } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) { + } else if (this.localConnection.compareAndSet(connection, null)) { // Stop the outbound bridges when the local connection is dropped since // the bridge has no consumer and needs to be restarted once a new connection @@ -614,7 +630,8 @@ public abstract class JmsConnector implements Service { this.failed.set(true); } - private ThreadFactory factory = new ThreadFactory() { + private final ThreadFactory factory = new ThreadFactory() { + @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: "); thread.setDaemon(true);
