Fix for https://issues.apache.org/jira/browse/AMQ-4918
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/687aa926 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/687aa926 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/687aa926 Branch: refs/heads/activemq-5.9 Commit: 687aa9265ed7ca04b7e364fe1e74733cee3a2d5e Parents: 3474f37 Author: rajdavies <[email protected]> Authored: Thu Dec 5 08:55:56 2013 +0000 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 13:14:28 2014 -0400 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 2 +- .../activemq/broker/jmx/NetworkBridgeView.java | 1 + .../broker/jmx/NetworkDestinationView.java | 6 ++ .../network/DemandForwardingBridgeSupport.java | 5 ++ .../network/DiscoveryNetworkConnector.java | 2 +- .../network/MBeanBridgeDestination.java | 68 ++++++++++++++++++-- .../activemq/network/MBeanNetworkListener.java | 16 +++-- .../apache/activemq/network/NetworkBridge.java | 2 + .../network/NetworkBridgeConfiguration.java | 25 ++++++- .../org/apache/activemq/bugs/AMQ4160Test.java | 4 ++ 10 files changed, 116 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 1ce75a5..997b323 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1297,7 +1297,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { if (duplexName.contains("#")) { duplexName = duplexName.substring(duplexName.lastIndexOf("#")); } - MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName)); + MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), config, broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName)); listener.setCreatedByDuplex(true); duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); duplexBridge.setBrokerService(broker.getBrokerService()); http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java index 795de3a..47f167a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java @@ -72,6 +72,7 @@ public class NetworkBridgeView implements NetworkBridgeViewMBean { } public void resetStats(){ + bridge.resetStats(); for (NetworkDestinationView networkDestinationView:networkDestinationViewList){ networkDestinationView.resetStats(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java index 26177d5..1dceee4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java @@ -73,7 +73,13 @@ public class NetworkDestinationView implements NetworkDestinationViewMBean { lastTime=currentTime; } + public long getLastAccessTime(){ + return timeStatistic.getLastSampleTime(); + } + public void close(){ networkBridgeView.removeNetworkDestinationView(this); } + + } http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index bf0b4f6..9fa38a4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -1538,6 +1538,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return mbeanObjectName; } + public void resetStats(){ + enqueueCounter.set(0); + dequeueCounter.set(0); + } + /* * Used to allow for async tasks to await receipt of the BrokerInfo from the local and * remote sides of the network bridge. http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index ed71b7b..d9cb0c6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -231,7 +231,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco class DiscoverNetworkBridgeListener extends MBeanNetworkListener { public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) { - super(brokerService, connectorName); + super(brokerService, DiscoveryNetworkConnector.this, connectorName); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java index c718063..583fab7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.network; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -27,6 +28,7 @@ import org.apache.activemq.broker.jmx.NetworkBridgeView; import org.apache.activemq.broker.jmx.NetworkDestinationView; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; +import org.apache.activemq.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,14 +37,24 @@ public class MBeanBridgeDestination { private final BrokerService brokerService; private final NetworkBridge bridge; private final NetworkBridgeView networkBridgeView; + private final NetworkBridgeConfiguration networkBridgeConfiguration; + private final Scheduler scheduler; + private final Runnable purgeInactiveDestinationViewTask; private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>(); private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>(); private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>(); - public MBeanBridgeDestination(BrokerService brokerService, NetworkBridge bridge, NetworkBridgeView networkBridgeView) { + public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) { this.brokerService = brokerService; + this.networkBridgeConfiguration = networkBridgeConfiguration; this.bridge = bridge; this.networkBridgeView = networkBridgeView; + this.scheduler = brokerService.getScheduler(); + purgeInactiveDestinationViewTask = new Runnable() { + public void run() { + purgeInactiveDestinationViews(); + } + }; } @@ -55,7 +67,7 @@ public class MBeanBridgeDestination { ObjectName bridgeObjectName = bridge.getMbeanObjectName(); try { ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination); - networkDestinationView = new NetworkDestinationView(networkBridgeView,destination.getPhysicalName()); + networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); destinationObjectNameMap.put(destination, objectName); outboundDestinationViewMap.put(destination, networkDestinationView); @@ -79,7 +91,7 @@ public class MBeanBridgeDestination { ObjectName bridgeObjectName = bridge.getMbeanObjectName(); try { ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination); - networkDestinationView= new NetworkDestinationView(networkBridgeView,destination.getPhysicalName()); + networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); networkBridgeView.addNetworkDestinationView(networkDestinationView); AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); destinationObjectNameMap.put(destination, objectName); @@ -93,11 +105,21 @@ public class MBeanBridgeDestination { networkDestinationView.messageSent(); } - public void close() { + public void start() { + if (networkBridgeConfiguration.isGcDestinationViews()) { + long period = networkBridgeConfiguration.getGcSweepTime(); + if (period > 0) { + scheduler.executePeriodically(purgeInactiveDestinationViewTask, period); + } + } + } + + public void stop() { if (!brokerService.isUseJmx()) { return; } + scheduler.cancel(purgeInactiveDestinationViewTask); for (ObjectName objectName : destinationObjectNameMap.values()) { try { if (objectName != null) { @@ -112,4 +134,42 @@ public class MBeanBridgeDestination { inboundDestinationViewMap.clear(); } + private void purgeInactiveDestinationViews() { + if (!brokerService.isUseJmx()) { + return; + } + purgeInactiveDestinationView(inboundDestinationViewMap); + purgeInactiveDestinationView(outboundDestinationViewMap); + } + + private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView> map) { + long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime(); + Map<ActiveMQDestination, NetworkDestinationView> gc = null; + for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet()) { + if (entry.getValue().getLastAccessTime() <= time) { + if (gc == null) { + gc = new HashMap<ActiveMQDestination, NetworkDestinationView>(); + } + gc.put(entry.getKey(), entry.getValue()); + } + } + + if (gc != null) { + for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : gc.entrySet()) { + map.remove(entry.getKey()); + ObjectName objectName = destinationObjectNameMap.get(entry.getKey()); + if (objectName != null) { + try { + if (objectName != null) { + brokerService.getManagementContext().unregisterMBean(objectName); + } + } catch (Throwable e) { + LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); + } + } + entry.getValue().close(); + } + } + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java index c877ecd..bf8facf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java @@ -33,12 +33,15 @@ public class MBeanNetworkListener implements NetworkBridgeListener { private static final Logger LOG = LoggerFactory.getLogger(MBeanNetworkListener.class); - BrokerService brokerService; - ObjectName connectorName; - boolean createdByDuplex = false; + private final BrokerService brokerService; + private final ObjectName connectorName; + private final NetworkBridgeConfiguration networkBridgeConfiguration; + private boolean createdByDuplex = false; private Map<NetworkBridge,MBeanBridgeDestination> destinationObjectNameMap = new ConcurrentHashMap<NetworkBridge,MBeanBridgeDestination>(); - public MBeanNetworkListener(BrokerService brokerService, ObjectName connectorName) { + + public MBeanNetworkListener(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, ObjectName connectorName) { this.brokerService = brokerService; + this.networkBridgeConfiguration = networkBridgeConfiguration; this.connectorName = connectorName; } @@ -57,8 +60,9 @@ public class MBeanNetworkListener implements NetworkBridgeListener { ObjectName objectName = createNetworkBridgeObjectName(bridge); AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, objectName); bridge.setMbeanObjectName(objectName); - MBeanBridgeDestination mBeanBridgeDestination = new MBeanBridgeDestination(brokerService,bridge,view); + MBeanBridgeDestination mBeanBridgeDestination = new MBeanBridgeDestination(brokerService,networkBridgeConfiguration,bridge,view); destinationObjectNameMap.put(bridge,mBeanBridgeDestination); + mBeanBridgeDestination.start(); LOG.debug("registered: {} as: {}", bridge, objectName); } catch (Throwable e) { LOG.debug("Network bridge could not be registered in JMX: {}", e.getMessage(), e); @@ -77,7 +81,7 @@ public class MBeanNetworkListener implements NetworkBridgeListener { } MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.remove(bridge); if (mBeanBridgeDestination != null){ - mBeanBridgeDestination.close(); + mBeanBridgeDestination.stop(); } } catch (Throwable e) { LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java index 9ba3c9f..95d0477 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java @@ -83,4 +83,6 @@ public interface NetworkBridge extends Service { * @return the MBean name used to identify this bridge in the MBean server. */ ObjectName getMbeanObjectName(); + + void resetStats(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index 597625c..bfff94e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -16,13 +16,13 @@ */ package org.apache.activemq.network; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - /** * Configuration for a NetworkBridge */ @@ -59,6 +59,8 @@ public class NetworkBridgeConfiguration { private boolean useCompression = false; private boolean advisoryForFailedForward = false; private boolean useBrokerNamesAsIdSeed = true; + private boolean gcDestinationViews = true; + private long gcSweepTime = 60 * 1000; /** * @return the conduitSubscriptions @@ -421,4 +423,21 @@ public class NetworkBridgeConfiguration { public void setUseBrokerNameAsIdSees(boolean val) { useBrokerNamesAsIdSeed = val; } + + public boolean isGcDestinationViews() { + return gcDestinationViews; + } + + public void setGcDestinationViews(boolean gcDestinationViews) { + this.gcDestinationViews = gcDestinationViews; + } + + public long getGcSweepTime() { + return gcSweepTime; + } + + public void setGcSweepTime(long gcSweepTime) { + this.gcSweepTime = gcSweepTime; + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java index c1dadd9..34bff2d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java @@ -331,6 +331,10 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { public ObjectName getMbeanObjectName() { return next.getMbeanObjectName(); } + + public void resetStats(){ + next.resetStats(); + } }; } };
