Fix for https://issues.apache.org/jira/browse/AMQ-4918
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cb3e854d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cb3e854d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cb3e854d Branch: refs/heads/activemq-5.9 Commit: cb3e854d26b2b0654e93b37cf9c4c2e30e0b64fa Parents: 37e1fd7 Author: rajdavies <[email protected]> Authored: Wed Dec 4 19:00:54 2013 +0000 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 13:14:00 2014 -0400 ---------------------------------------------------------------------- .../activemq/broker/jmx/BrokerMBeanSupport.java | 15 +++ .../broker/jmx/NetworkDestinationView.java | 73 ++++++++++++ .../broker/jmx/NetworkDestinationViewMBean.java | 48 ++++++++ .../network/DemandForwardingBridgeSupport.java | 16 +++ .../network/MBeanBridgeDestination.java | 111 +++++++++++++++++++ .../activemq/network/MBeanNetworkListener.java | 34 +++++- .../activemq/network/NetworkBridgeListener.java | 16 +++ .../activemq/management/SizeStatisticImpl.java | 2 +- 8 files changed, 312 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/cb3e854d/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java index 3ad4495..87605bc 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java @@ -173,6 +173,21 @@ public class BrokerMBeanSupport { return new ObjectName(connectorName.getDomain(), map); } + public static ObjectName createNetworkOutBoundDestinationObjectName(ObjectName networkName, ActiveMQDestination destination) throws MalformedObjectNameException { + String str = networkName.toString(); + str += ",direction=outbound" + createDestinationProperties(destination); + return new ObjectName(str); + + } + + public static ObjectName createNetworkInBoundDestinationObjectName(ObjectName networkName, ActiveMQDestination destination) throws MalformedObjectNameException { + String str = networkName.toString(); + str += ",direction=inbound" + createDestinationProperties(destination); + return new ObjectName(str); + + } + + public static ObjectName createProxyConnectorName(ObjectName brokerObjectName, String type, String name) throws MalformedObjectNameException { return createProxyConnectorName(brokerObjectName.toString(), type, name); } http://git-wip-us.apache.org/repos/asf/activemq/blob/cb3e854d/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java new file mode 100644 index 0000000..4edfd37 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import org.apache.activemq.management.TimeStatisticImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetworkDestinationView implements NetworkDestinationViewMBean { + private static final Logger LOG = LoggerFactory.getLogger(NetworkDestinationViewMBean.class); + private TimeStatisticImpl timeStatistic = new TimeStatisticImpl("networkEnqueue","network messages enqueued"); + + private final String name; + private long lastTime = -1; + + public NetworkDestinationView(String name){ + this.name = name; + } + /** + * Returns the name of this destination + */ + @Override + public String getName() { + return name; + } + + /** + * Resets the managment counters. + */ + @Override + public void resetStatistics() { + timeStatistic.reset(); + lastTime = -1; + } + + + @Override + public long getCount() { + return timeStatistic.getCount(); + } + + @Override + public double getRate() { + return timeStatistic.getAveragePerSecond(); + } + + public void messageSent(){ + long currentTime = System.currentTimeMillis(); + long time = 0; + if (lastTime < 0){ + time = 0; + lastTime = currentTime; + }else{ + time = currentTime-lastTime; + } + timeStatistic.addTime(time); + lastTime=currentTime; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/cb3e854d/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java new file mode 100644 index 0000000..7164cdd --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +public interface NetworkDestinationViewMBean { + + /** + * Returns the name of this destination + */ + @MBeanInfo("Name of this destination.") + String getName(); + + /** + * Resets the managment counters. + */ + @MBeanInfo("Resets statistics.") + void resetStatistics(); + + /** + * Returns the number of messages that have been sent to the destination. + * + * @return The number of messages that have been sent to the destination. + */ + @MBeanInfo("Number of messages that have been sent to the destination.") + long getCount(); + + /** + * Returns the rate of messages that have been sent to the destination. + * + * @return The rate of messages that have been sent to the destination. + */ + @MBeanInfo("rate of messages sent across the network destination.") + double getRate(); +} http://git-wip-us.apache.org/repos/asf/activemq/blob/cb3e854d/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index af10a94..bf0b4f6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -607,6 +607,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else { duplexInboundLocalBroker.oneway(message); } + serviceInboundMessage(message); } } else { switch (command.getDataStructureType()) { @@ -985,6 +986,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br sub.decrementOutstandingResponses(); } } + serviceOutbound(message); } else { LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage()); } @@ -1612,4 +1614,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + protected void serviceOutbound(Message message){ + NetworkBridgeListener l = this.networkBridgeListener; + if (l != null){ + l.onOutboundMessage(this,message); + } + } + + protected void serviceInboundMessage(Message message){ + NetworkBridgeListener l = this.networkBridgeListener; + if (l != null){ + l.onInboundMessage(this,message); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/cb3e854d/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java new file mode 100644 index 0000000..666f11e --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.management.ObjectName; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.AnnotatedMBean; +import org.apache.activemq.broker.jmx.BrokerMBeanSupport; +import org.apache.activemq.broker.jmx.NetworkDestinationView; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MBeanBridgeDestination { + private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class); + private final BrokerService brokerService; + private final NetworkBridge bridge; + private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>(); + private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>(); + private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>(); + + public MBeanBridgeDestination(BrokerService brokerService, NetworkBridge bridge) { + this.brokerService = brokerService; + this.bridge = bridge; + } + + + public void onOutboundMessage(Message message) { + ActiveMQDestination destination = message.getDestination(); + NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination); + if (networkDestinationView == null) { + synchronized (destinationObjectNameMap) { + if (!destinationObjectNameMap.containsKey(destination)) { + ObjectName bridgeObjectName = bridge.getMbeanObjectName(); + try { + ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination); + networkDestinationView = new NetworkDestinationView(destination.getPhysicalName()); + AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); + destinationObjectNameMap.put(destination, objectName); + outboundDestinationViewMap.put(destination, networkDestinationView); + + } catch (Exception e) { + LOG.warn("Failed to register " + destination, e); + } + } + } + } + networkDestinationView.messageSent(); + } + + + public void onInboundMessage(Message message) { + ActiveMQDestination destination = message.getDestination(); + NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination); + if (networkDestinationView == null) { + synchronized (destinationObjectNameMap) { + if (!destinationObjectNameMap.containsKey(destination)) { + ObjectName bridgeObjectName = bridge.getMbeanObjectName(); + try { + ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination); + networkDestinationView= new NetworkDestinationView(destination.getPhysicalName()); + AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); + destinationObjectNameMap.put(destination, objectName); + inboundDestinationViewMap.put(destination, networkDestinationView); + } catch (Exception e) { + LOG.warn("Failed to register " + destination, e); + } + } + } + } + networkDestinationView.messageSent(); + } + + public void close() { + if (!brokerService.isUseJmx()) { + return; + } + + for (ObjectName objectName : destinationObjectNameMap.values()) { + try { + if (objectName != null) { + brokerService.getManagementContext().unregisterMBean(objectName); + } + } catch (Throwable e) { + LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); + } + } + destinationObjectNameMap.clear(); + outboundDestinationViewMap.clear(); + inboundDestinationViewMap.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/cb3e854d/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java index c9a9935..0481f3d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java @@ -16,14 +16,17 @@ */ package org.apache.activemq.network; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import javax.management.MalformedObjectNameException; import javax.management.ObjectName; - import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.AnnotatedMBean; import org.apache.activemq.broker.jmx.BrokerMBeanSupport; import org.apache.activemq.broker.jmx.NetworkBridgeView; import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean; +import org.apache.activemq.command.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +37,7 @@ public class MBeanNetworkListener implements NetworkBridgeListener { BrokerService brokerService; ObjectName connectorName; boolean createdByDuplex = false; - + private Map<NetworkBridge,MBeanBridgeDestination> destinationObjectNameMap = new ConcurrentHashMap<NetworkBridge,MBeanBridgeDestination>(); public MBeanNetworkListener(BrokerService brokerService, ObjectName connectorName) { this.brokerService = brokerService; this.connectorName = connectorName; @@ -55,6 +58,8 @@ public class MBeanNetworkListener implements NetworkBridgeListener { ObjectName objectName = createNetworkBridgeObjectName(bridge); AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, objectName); bridge.setMbeanObjectName(objectName); + MBeanBridgeDestination mBeanBridgeDestination = new MBeanBridgeDestination(brokerService,bridge); + destinationObjectNameMap.put(bridge,mBeanBridgeDestination); LOG.debug("registered: {} as: {}", bridge, objectName); } catch (Throwable e) { LOG.debug("Network bridge could not be registered in JMX: {}", e.getMessage(), e); @@ -71,11 +76,17 @@ public class MBeanNetworkListener implements NetworkBridgeListener { if (objectName != null) { brokerService.getManagementContext().unregisterMBean(objectName); } + MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.remove(bridge); + if (mBeanBridgeDestination != null){ + mBeanBridgeDestination.close(); + } } catch (Throwable e) { LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); } } + + protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException { return BrokerMBeanSupport.createNetworkBridgeObjectName(connectorName, bridge.getRemoteAddress()); } @@ -83,4 +94,23 @@ public class MBeanNetworkListener implements NetworkBridgeListener { public void setCreatedByDuplex(boolean createdByDuplex) { this.createdByDuplex = createdByDuplex; } + + + + @Override + public void onOutboundMessage(NetworkBridge bridge,Message message) { + MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.get(bridge); + if (mBeanBridgeDestination != null){ + mBeanBridgeDestination.onOutboundMessage(message); + } + } + + @Override + public void onInboundMessage(NetworkBridge bridge,Message message) { + MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.get(bridge); + if (mBeanBridgeDestination != null){ + mBeanBridgeDestination.onInboundMessage(message); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/cb3e854d/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java index 7d49177..c30a999 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.network; +import org.apache.activemq.command.Message; + /** * called when a bridge fails * @@ -38,4 +40,18 @@ public interface NetworkBridgeListener { */ void onStop(NetworkBridge bridge); + /** + * Called when message forwarded over the network + * @param bridge + * @param message + */ + void onOutboundMessage (NetworkBridge bridge,Message message); + + /** + * Called for when a message arrives over the network + * @param bridge + * @param message + */ + void onInboundMessage (NetworkBridge bridge,Message message); + } http://git-wip-us.apache.org/repos/asf/activemq/blob/cb3e854d/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java index 14664d2..d3dbb50 100644 --- a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java +++ b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java @@ -154,7 +154,7 @@ public class SizeStatisticImpl extends StatisticImpl{ buffer.append(Long.toString(minSize)); buffer.append(" totalSize: "); buffer.append(Long.toString(totalSize)); - buffer.append(" averageTime: "); + buffer.append(" averageSize: "); buffer.append(Double.toString(getAverageSize())); buffer.append(" averageTimeExMinMax: "); buffer.append(Double.toString(getAveragePerSecondExcludingMinMax()));
