Fix for https://issues.apache.org/jira/browse/AMQ-4918


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/687aa926
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/687aa926
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/687aa926

Branch: refs/heads/activemq-5.9
Commit: 687aa9265ed7ca04b7e364fe1e74733cee3a2d5e
Parents: 3474f37
Author: rajdavies <[email protected]>
Authored: Thu Dec 5 08:55:56 2013 +0000
Committer: Hadrian Zbarcea <[email protected]>
Committed: Wed Mar 12 13:14:28 2014 -0400

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |  2 +-
 .../activemq/broker/jmx/NetworkBridgeView.java  |  1 +
 .../broker/jmx/NetworkDestinationView.java      |  6 ++
 .../network/DemandForwardingBridgeSupport.java  |  5 ++
 .../network/DiscoveryNetworkConnector.java      |  2 +-
 .../network/MBeanBridgeDestination.java         | 68 ++++++++++++++++++--
 .../activemq/network/MBeanNetworkListener.java  | 16 +++--
 .../apache/activemq/network/NetworkBridge.java  |  2 +
 .../network/NetworkBridgeConfiguration.java     | 25 ++++++-
 .../org/apache/activemq/bugs/AMQ4160Test.java   |  4 ++
 10 files changed, 116 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 1ce75a5..997b323 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -1297,7 +1297,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 if (duplexName.contains("#")) {
                     duplexName = 
duplexName.substring(duplexName.lastIndexOf("#"));
                 }
-                MBeanNetworkListener listener = new 
MBeanNetworkListener(broker.getBrokerService(), 
broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
+                MBeanNetworkListener listener = new 
MBeanNetworkListener(broker.getBrokerService(), config, 
broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
                 listener.setCreatedByDuplex(true);
                 duplexBridge = NetworkBridgeFactory.createBridge(config, 
localTransport, remoteBridgeTransport, listener);
                 duplexBridge.setBrokerService(broker.getBrokerService());

http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
index 795de3a..47f167a 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
@@ -72,6 +72,7 @@ public class NetworkBridgeView implements 
NetworkBridgeViewMBean {
     }
 
     public void resetStats(){
+        bridge.resetStats();
         for (NetworkDestinationView 
networkDestinationView:networkDestinationViewList){
             networkDestinationView.resetStats();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
index 26177d5..1dceee4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
@@ -73,7 +73,13 @@ public class NetworkDestinationView implements 
NetworkDestinationViewMBean {
         lastTime=currentTime;
     }
 
+    public long getLastAccessTime(){
+        return timeStatistic.getLastSampleTime();
+    }
+
     public void close(){
         networkBridgeView.removeNetworkDestinationView(this);
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index bf0b4f6..9fa38a4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1538,6 +1538,11 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         return mbeanObjectName;
     }
 
+    public void resetStats(){
+        enqueueCounter.set(0);
+        dequeueCounter.set(0);
+    }
+
     /*
      * Used to allow for async tasks to await receipt of the BrokerInfo from 
the local and
      * remote sides of the network bridge.

http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
index ed71b7b..d9cb0c6 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
@@ -231,7 +231,7 @@ public class DiscoveryNetworkConnector extends 
NetworkConnector implements Disco
         class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
 
             public DiscoverNetworkBridgeListener(BrokerService brokerService, 
ObjectName connectorName) {
-                super(brokerService, connectorName);
+                super(brokerService, DiscoveryNetworkConnector.this, 
connectorName);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
index c718063..583fab7 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.network;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -27,6 +28,7 @@ import org.apache.activemq.broker.jmx.NetworkBridgeView;
 import org.apache.activemq.broker.jmx.NetworkDestinationView;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.thread.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,14 +37,24 @@ public class MBeanBridgeDestination {
     private final BrokerService brokerService;
     private final NetworkBridge bridge;
     private final NetworkBridgeView networkBridgeView;
+    private final NetworkBridgeConfiguration networkBridgeConfiguration;
+    private final Scheduler scheduler;
+    private final Runnable purgeInactiveDestinationViewTask;
     private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = 
new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
     private Map<ActiveMQDestination, NetworkDestinationView> 
outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, 
NetworkDestinationView>();
     private Map<ActiveMQDestination, NetworkDestinationView> 
inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, 
NetworkDestinationView>();
 
-    public MBeanBridgeDestination(BrokerService brokerService, NetworkBridge 
bridge, NetworkBridgeView networkBridgeView) {
+    public MBeanBridgeDestination(BrokerService brokerService, 
NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, 
NetworkBridgeView networkBridgeView) {
         this.brokerService = brokerService;
+        this.networkBridgeConfiguration = networkBridgeConfiguration;
         this.bridge = bridge;
         this.networkBridgeView = networkBridgeView;
+        this.scheduler = brokerService.getScheduler();
+        purgeInactiveDestinationViewTask = new Runnable() {
+            public void run() {
+                purgeInactiveDestinationViews();
+            }
+        };
     }
 
 
@@ -55,7 +67,7 @@ public class MBeanBridgeDestination {
                     ObjectName bridgeObjectName = bridge.getMbeanObjectName();
                     try {
                         ObjectName objectName = 
BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, 
destination);
-                        networkDestinationView = new 
NetworkDestinationView(networkBridgeView,destination.getPhysicalName());
+                        networkDestinationView = new 
NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
                         
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), 
networkDestinationView, objectName);
                         destinationObjectNameMap.put(destination, objectName);
                         outboundDestinationViewMap.put(destination, 
networkDestinationView);
@@ -79,7 +91,7 @@ public class MBeanBridgeDestination {
                     ObjectName bridgeObjectName = bridge.getMbeanObjectName();
                     try {
                         ObjectName objectName = 
BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, 
destination);
-                        networkDestinationView= new 
NetworkDestinationView(networkBridgeView,destination.getPhysicalName());
+                        networkDestinationView = new 
NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
                         
networkBridgeView.addNetworkDestinationView(networkDestinationView);
                         
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), 
networkDestinationView, objectName);
                         destinationObjectNameMap.put(destination, objectName);
@@ -93,11 +105,21 @@ public class MBeanBridgeDestination {
         networkDestinationView.messageSent();
     }
 
-    public void close() {
+    public void start() {
+        if (networkBridgeConfiguration.isGcDestinationViews()) {
+            long period = networkBridgeConfiguration.getGcSweepTime();
+            if (period > 0) {
+                
scheduler.executePeriodically(purgeInactiveDestinationViewTask, period);
+            }
+        }
+    }
+
+    public void stop() {
         if (!brokerService.isUseJmx()) {
             return;
         }
 
+        scheduler.cancel(purgeInactiveDestinationViewTask);
         for (ObjectName objectName : destinationObjectNameMap.values()) {
             try {
                 if (objectName != null) {
@@ -112,4 +134,42 @@ public class MBeanBridgeDestination {
         inboundDestinationViewMap.clear();
     }
 
+    private void purgeInactiveDestinationViews() {
+        if (!brokerService.isUseJmx()) {
+            return;
+        }
+        purgeInactiveDestinationView(inboundDestinationViewMap);
+        purgeInactiveDestinationView(outboundDestinationViewMap);
+    }
+
+    private void purgeInactiveDestinationView(Map<ActiveMQDestination, 
NetworkDestinationView> map) {
+        long time = System.currentTimeMillis() - 
networkBridgeConfiguration.getGcSweepTime();
+        Map<ActiveMQDestination, NetworkDestinationView> gc = null;
+        for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : 
map.entrySet()) {
+            if (entry.getValue().getLastAccessTime() <= time) {
+                if (gc == null) {
+                    gc = new HashMap<ActiveMQDestination, 
NetworkDestinationView>();
+                }
+                gc.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        if (gc != null) {
+            for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry 
: gc.entrySet()) {
+                map.remove(entry.getKey());
+                ObjectName objectName = 
destinationObjectNameMap.get(entry.getKey());
+                if (objectName != null) {
+                    try {
+                        if (objectName != null) {
+                            
brokerService.getManagementContext().unregisterMBean(objectName);
+                        }
+                    } catch (Throwable e) {
+                        LOG.debug("Network bridge could not be unregistered in 
JMX: {}", e.getMessage(), e);
+                    }
+                }
+                entry.getValue().close();
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
index c877ecd..bf8facf 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
@@ -33,12 +33,15 @@ public class MBeanNetworkListener implements 
NetworkBridgeListener {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MBeanNetworkListener.class);
 
-    BrokerService brokerService;
-    ObjectName connectorName;
-    boolean createdByDuplex = false;
+    private final BrokerService brokerService;
+    private final ObjectName connectorName;
+    private final NetworkBridgeConfiguration networkBridgeConfiguration;
+    private boolean createdByDuplex = false;
     private Map<NetworkBridge,MBeanBridgeDestination> destinationObjectNameMap 
= new ConcurrentHashMap<NetworkBridge,MBeanBridgeDestination>();
-    public MBeanNetworkListener(BrokerService brokerService, ObjectName 
connectorName) {
+
+    public MBeanNetworkListener(BrokerService brokerService, 
NetworkBridgeConfiguration networkBridgeConfiguration, ObjectName 
connectorName) {
         this.brokerService = brokerService;
+        this.networkBridgeConfiguration = networkBridgeConfiguration;
         this.connectorName = connectorName;
     }
 
@@ -57,8 +60,9 @@ public class MBeanNetworkListener implements 
NetworkBridgeListener {
             ObjectName objectName = createNetworkBridgeObjectName(bridge);
             AnnotatedMBean.registerMBean(brokerService.getManagementContext(), 
view, objectName);
             bridge.setMbeanObjectName(objectName);
-            MBeanBridgeDestination mBeanBridgeDestination = new 
MBeanBridgeDestination(brokerService,bridge,view);
+            MBeanBridgeDestination mBeanBridgeDestination = new 
MBeanBridgeDestination(brokerService,networkBridgeConfiguration,bridge,view);
             destinationObjectNameMap.put(bridge,mBeanBridgeDestination);
+            mBeanBridgeDestination.start();
             LOG.debug("registered: {} as: {}", bridge, objectName);
         } catch (Throwable e) {
             LOG.debug("Network bridge could not be registered in JMX: {}", 
e.getMessage(), e);
@@ -77,7 +81,7 @@ public class MBeanNetworkListener implements 
NetworkBridgeListener {
             }
             MBeanBridgeDestination mBeanBridgeDestination = 
destinationObjectNameMap.remove(bridge);
             if (mBeanBridgeDestination != null){
-                mBeanBridgeDestination.close();
+                mBeanBridgeDestination.stop();
             }
         } catch (Throwable e) {
             LOG.debug("Network bridge could not be unregistered in JMX: {}", 
e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
index 9ba3c9f..95d0477 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
@@ -83,4 +83,6 @@ public interface NetworkBridge extends Service {
      * @return the MBean name used to identify this bridge in the MBean server.
      */
     ObjectName getMbeanObjectName();
+
+    void resetStats();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 597625c..bfff94e 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.network;
 
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
 /**
  * Configuration for a NetworkBridge
  */
@@ -59,6 +59,8 @@ public class NetworkBridgeConfiguration {
     private boolean useCompression = false;
     private boolean advisoryForFailedForward = false;
     private boolean useBrokerNamesAsIdSeed = true;
+    private boolean gcDestinationViews = true;
+    private long gcSweepTime = 60 * 1000;
 
     /**
      * @return the conduitSubscriptions
@@ -421,4 +423,21 @@ public class NetworkBridgeConfiguration {
     public void setUseBrokerNameAsIdSees(boolean val) {
         useBrokerNamesAsIdSeed = val;
     }
+
+    public boolean isGcDestinationViews() {
+        return gcDestinationViews;
+    }
+
+    public void setGcDestinationViews(boolean gcDestinationViews) {
+        this.gcDestinationViews = gcDestinationViews;
+    }
+
+    public long getGcSweepTime() {
+        return gcSweepTime;
+    }
+
+    public void setGcSweepTime(long gcSweepTime) {
+        this.gcSweepTime = gcSweepTime;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/687aa926/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
index c1dadd9..34bff2d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
@@ -331,6 +331,10 @@ public class AMQ4160Test extends 
JmsMultipleBrokersTestSupport {
                     public ObjectName getMbeanObjectName() {
                         return next.getMbeanObjectName();
                     }
+
+                    public void resetStats(){
+                        next.resetStats();
+                    }
                 };
             }
         };

Reply via email to