Author: chirino
Date: Tue Feb 7 10:34:21 2006
New Revision: 375654
URL: http://svn.apache.org/viewcvs?rev=375654&view=rev
Log:
- Implemented http://jira.activemq.org/jira/browse/AMQ-511
- Queues can now be browsed, messages deleted, or queue can be purged.
- Added initial hooks to expose subscriptions for JMX managment.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
- copied, changed from r375288,
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
- copied, changed from r375288,
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Removed:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
Tue Feb 7 10:34:21 2006
@@ -16,12 +16,16 @@
*/
package org.apache.activemq.broker.jmx;
+import javax.jms.InvalidSelectorException;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.QueueRegion;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -33,6 +37,17 @@
public ManagedQueueRegion(ManagedRegionBroker broker,
DestinationStatistics destinationStatistics, UsageManager memoryManager,
TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter,
PolicyMap policyMap) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory,
persistenceAdapter, policyMap);
regionBroker = broker;
+ }
+
+ protected Subscription createSubscription(ConnectionContext context,
ConsumerInfo info) throws InvalidSelectorException {
+ Subscription sub = super.createSubscription(context, info);
+ regionBroker.registerSubscription(sub);
+ return sub;
+ }
+
+ protected void destroySubscription(Subscription sub) {
+ regionBroker.unregisterSubscription(sub);
+ super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Tue Feb 7 10:34:21 2006
@@ -18,8 +18,11 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.memory.UsageManager;
@@ -68,9 +71,15 @@
map.put("Destination",
JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
ObjectName destObjectName= new
ObjectName(brokerObjectName.getDomain(), map);
- DestinationViewMBean view = new DestinationView(destination);
+ Object view;
+ if( destination instanceof Queue ) {
+ view = new QueueView((Queue) destination);
+ } else {
+ view = new TopicView((Topic) destination);
+ }
mbeanServer.registerMBean(view, destObjectName);
+
}
public void unregister(ActiveMQDestination destName) throws Throwable {
@@ -81,5 +90,13 @@
ObjectName destObjectName= new
ObjectName(brokerObjectName.getDomain(), map);
mbeanServer.unregisterMBean(destObjectName);
+ }
+
+ public void registerSubscription(Subscription sub) {
+ // TODO: Use this to expose subscriptions to the JMX bus for management
+ }
+
+ public void unregisterSubscription(Subscription sub) {
+ // TODO: Use this to expose subscriptions to the JMX bus for management
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
Tue Feb 7 10:34:21 2006
@@ -16,23 +16,36 @@
*/
package org.apache.activemq.broker.jmx;
-import org.apache.activemq.broker.Broker;
+import javax.jms.InvalidSelectorException;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TempQueueRegion;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
public class ManagedTempQueueRegion extends TempQueueRegion {
private final ManagedRegionBroker regionBroker;
-
public ManagedTempQueueRegion(ManagedRegionBroker regionBroker,
DestinationStatistics destinationStatistics, UsageManager memoryManager,
TaskRunnerFactory taskRunnerFactory) {
super(regionBroker,destinationStatistics, memoryManager,
taskRunnerFactory);
this.regionBroker = regionBroker;
+ }
+
+ protected Subscription createSubscription(ConnectionContext context,
ConsumerInfo info) throws InvalidSelectorException {
+ Subscription sub = super.createSubscription(context, info);
+ regionBroker.registerSubscription(sub);
+ return sub;
+ }
+
+ protected void destroySubscription(Subscription sub) {
+ regionBroker.unregisterSubscription(sub);
+ super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
Tue Feb 7 10:34:21 2006
@@ -16,12 +16,15 @@
*/
package org.apache.activemq.broker.jmx;
-import org.apache.activemq.broker.Broker;
+import javax.jms.JMSException;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TempTopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -32,6 +35,17 @@
public ManagedTempTopicRegion(ManagedRegionBroker regionBroker,
DestinationStatistics destinationStatistics, UsageManager memoryManager,
TaskRunnerFactory taskRunnerFactory) {
super(regionBroker,destinationStatistics, memoryManager,
taskRunnerFactory);
this.regionBroker = regionBroker;
+ }
+
+ protected Subscription createSubscription(ConnectionContext context,
ConsumerInfo info) throws JMSException {
+ Subscription sub = super.createSubscription(context, info);
+ regionBroker.registerSubscription(sub);
+ return sub;
+ }
+
+ protected void destroySubscription(Subscription sub) {
+ regionBroker.unregisterSubscription(sub);
+ super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
Tue Feb 7 10:34:21 2006
@@ -16,12 +16,16 @@
*/
package org.apache.activemq.broker.jmx;
+import javax.jms.JMSException;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -33,6 +37,17 @@
public ManagedTopicRegion(ManagedRegionBroker broker,
DestinationStatistics destinationStatistics, UsageManager memoryManager,
TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter,
PolicyMap policyMap) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory,
persistenceAdapter, policyMap);
regionBroker = broker;
+ }
+
+ protected Subscription createSubscription(ConnectionContext context,
ConsumerInfo info) throws JMSException {
+ Subscription sub = super.createSubscription(context, info);
+ regionBroker.registerSubscription(sub);
+ return sub;
+ }
+
+ protected void destroySubscription(Subscription sub) {
+ regionBroker.unregisterSubscription(sub);
+ super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination)
throws Throwable {
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?rev=375654&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
Tue Feb 7 10:34:21 2006
@@ -0,0 +1,257 @@
+package org.apache.activemq.broker.jmx;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Message;
+
+public class OpenTypeSupport {
+
+ interface OpenTypeFactory {
+ CompositeType getCompositeType() throws OpenDataException;
+ Map getFields( Object o ) throws OpenDataException;
+ }
+
+ private static final HashMap openTypeFactories = new HashMap();
+
+ abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
+
+ private CompositeType compositeType;
+ ArrayList itemNamesList = new ArrayList();
+ ArrayList itemDescriptionsList = new ArrayList();
+ ArrayList itemTypesList = new ArrayList();
+
+ public CompositeType getCompositeType() throws OpenDataException {
+ if( compositeType == null ) {
+ init();
+ compositeType = createCompositeType();
+ }
+ return compositeType;
+ }
+
+ protected void init() throws OpenDataException {
+ }
+
+ protected CompositeType createCompositeType() throws OpenDataException
{
+ String[] itemNames = (String[]) itemNamesList.toArray(new
String[itemNamesList.size()]);
+ String[] itemDescriptions = (String[])
itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
+ OpenType[] itemTypes = (OpenType[]) itemTypesList.toArray(new
OpenType[itemTypesList.size()]);
+ return new CompositeType(getTypeName(), getDescription(),
itemNames, itemDescriptions, itemTypes);
+ }
+
+ abstract protected String getTypeName();
+
+ protected void addItem(String name, String description, OpenType type)
{
+ itemNamesList.add(name);
+ itemDescriptionsList.add(description);
+ itemTypesList.add(type);
+ }
+
+
+ protected String getDescription() {
+ return getTypeName();
+ }
+
+ public Map getFields(Object o) throws OpenDataException {
+ HashMap rc = new HashMap();
+ return rc;
+ }
+ }
+
+ static class MessageOpenTypeFactory extends AbstractOpenTypeFactory {
+
+ protected String getTypeName() {
+ return ActiveMQMessage.class.getName();
+ }
+
+ protected void init() throws OpenDataException {
+ super.init();
+ addItem("JMSCorrelationID", "JMSCorrelationID", SimpleType.STRING);
+ addItem("JMSDestination", "JMSDestination", SimpleType.STRING);
+ addItem("JMSMessageID", "JMSMessageID", SimpleType.STRING);
+ addItem("JMSReplyTo", "JMSReplyTo", SimpleType.STRING);
+ addItem("JMSType", "JMSType", SimpleType.STRING);
+ addItem("JMSDeliveryMode", "JMSDeliveryMode", SimpleType.STRING);
+ addItem("JMSExpiration", "JMSExpiration", SimpleType.LONG);
+ addItem("JMSPriority", "JMSPriority", SimpleType.INTEGER);
+ addItem("JMSRedelivered", "JMSRedelivered", SimpleType.BOOLEAN);
+ addItem("JMSTimestamp", "JMSTimestamp", SimpleType.DATE);
+ addItem("Properties", "Properties", SimpleType.STRING);
+ }
+
+ public Map getFields(Object o) throws OpenDataException {
+ ActiveMQMessage m = (ActiveMQMessage) o;
+ Map rc = super.getFields(o);
+ rc.put("JMSCorrelationID", m.getJMSCorrelationID());
+ rc.put("JMSDestination", ""+m.getJMSDestination());
+ rc.put("JMSMessageID", m.getJMSMessageID());
+ rc.put("JMSReplyTo", ""+m.getJMSReplyTo());
+ rc.put("JMSType", m.getJMSType());
+ rc.put("JMSDeliveryMode",
m.getJMSDeliveryMode()==DeliveryMode.PERSISTENT ? "PERSISTENT" :
"NON-PERSISTENT");
+ rc.put("JMSExpiration", new Long(m.getJMSExpiration()));
+ rc.put("JMSPriority", new Integer(m.getJMSPriority()));
+ rc.put("JMSRedelivered", new Boolean(m.getJMSRedelivered()));
+ rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
+ try {
+ rc.put("Properties", ""+m.getProperties());
+ } catch (IOException e) {
+ rc.put("Properties", "");
+ }
+ return rc;
+ }
+ }
+
+ static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory {
+
+ protected String getTypeName() {
+ return ActiveMQBytesMessage.class.getName();
+ }
+
+ protected void init() throws OpenDataException {
+ super.init();
+ addItem("BodyLength", "Body length", SimpleType.LONG);
+ addItem("BodyPreview", "Body preview", new
ArrayType(1,SimpleType.BYTE));
+ }
+
+ public Map getFields(Object o) throws OpenDataException {
+ ActiveMQBytesMessage m = (ActiveMQBytesMessage) o;
+ Map rc = super.getFields(o);
+ long length=0;
+ try {
+ length = m.getBodyLength();
+ rc.put("BodyLength", new Long(length));
+ } catch (JMSException e) {
+ rc.put("BodyLength", new Long(0));
+ }
+ try {
+ byte preview[] = new byte[ (int)Math.min(length, 255) ];
+ m.readBytes(preview);
+ rc.put("BodyPreview", preview);
+ } catch (JMSException e) {
+ rc.put("BodyPreview", new byte[]{});
+ }
+ return rc;
+ }
+
+ }
+
+ static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory {
+ protected String getTypeName() {
+ return ActiveMQMapMessage.class.getName();
+ }
+
+ protected void init() throws OpenDataException {
+ super.init();
+ addItem("ContentMap", "Content map", SimpleType.STRING);
+ }
+
+ public Map getFields(Object o) throws OpenDataException {
+ ActiveMQMapMessage m = (ActiveMQMapMessage) o;
+ Map rc = super.getFields(o);
+ long length=0;
+ try {
+ rc.put("ContentMap", ""+m.getContentMap());
+ } catch (JMSException e) {
+ rc.put("ContentMap", "");
+ }
+ return rc;
+ }
+ }
+
+ static class ObjectMessageOpenTypeFactory extends MessageOpenTypeFactory {
+ protected String getTypeName() {
+ return ActiveMQObjectMessage.class.getName();
+ }
+
+ protected void init() throws OpenDataException {
+ super.init();
+ }
+
+ public Map getFields(Object o) throws OpenDataException {
+ ActiveMQObjectMessage m = (ActiveMQObjectMessage) o;
+ Map rc = super.getFields(o);
+ return rc;
+ }
+ }
+
+ static class StreamMessageOpenTypeFactory extends MessageOpenTypeFactory {
+ protected String getTypeName() {
+ return ActiveMQStreamMessage.class.getName();
+ }
+
+ protected void init() throws OpenDataException {
+ super.init();
+ }
+
+ public Map getFields(Object o) throws OpenDataException {
+ ActiveMQStreamMessage m = (ActiveMQStreamMessage) o;
+ Map rc = super.getFields(o);
+ return rc;
+ }
+ }
+
+ static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
+ protected String getTypeName() {
+ return ActiveMQTextMessage.class.getName();
+ }
+
+ protected void init() throws OpenDataException {
+ super.init();
+ addItem("Text", "Text", SimpleType.STRING);
+ }
+
+ public Map getFields(Object o) throws OpenDataException {
+ ActiveMQTextMessage m = (ActiveMQTextMessage) o;
+ Map rc = super.getFields(o);
+ try {
+ rc.put("Text", ""+m.getText());
+ } catch (JMSException e) {
+ rc.put("Text", "");
+ }
+ return rc;
+ }
+ }
+
+ static {
+ openTypeFactories.put(ActiveMQMessage.class, new
MessageOpenTypeFactory());
+ openTypeFactories.put(ActiveMQBytesMessage.class, new
ByteMessageOpenTypeFactory());
+ openTypeFactories.put(ActiveMQMapMessage.class, new
MapMessageOpenTypeFactory());
+ openTypeFactories.put(ActiveMQObjectMessage.class, new
ObjectMessageOpenTypeFactory());
+ openTypeFactories.put(ActiveMQStreamMessage.class, new
StreamMessageOpenTypeFactory());
+ openTypeFactories.put(ActiveMQTextMessage.class, new
TextMessageOpenTypeFactory());
+ }
+
+ public static OpenTypeFactory getFactory(Class clazz) throws
OpenDataException {
+ return (OpenTypeFactory) openTypeFactories.get(clazz);
+ }
+
+ public static CompositeData convert(Message message) throws
OpenDataException {
+ OpenTypeFactory f = getFactory(message.getClass());
+ if( f == null )
+ throw new OpenDataException("Cannot create a CompositeData for
type: "+message.getClass().getName());
+ CompositeType ct = f.getCompositeType();
+ Map fields = f.getFields(message);
+ return new CompositeDataSupport(ct, fields);
+ }
+
+
+}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=375654&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
Tue Feb 7 10:34:21 2006
@@ -0,0 +1,113 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.Message;
+
+public class QueueView implements QueueViewMBean {
+
+ private final Queue destination;
+
+ public QueueView(Queue destination) {
+ this.destination = destination;
+ }
+
+ public void gc() {
+ destination.gc();
+ }
+ public void resetStatistics() {
+ destination.getDestinationStatistics().reset();
+ }
+
+ public long getEnqueueCount() {
+ return destination.getDestinationStatistics().getEnqueues().getCount();
+
+ }
+ public long getDequeueCount() {
+ return destination.getDestinationStatistics().getDequeues().getCount();
+ }
+
+ public long getConsumerCount() {
+ return
destination.getDestinationStatistics().getConsumers().getCount();
+ }
+
+ public long getMessages() {
+ return destination.getDestinationStatistics().getMessages().getCount();
+ }
+
+ public long getMessagesCached() {
+ return
destination.getDestinationStatistics().getMessagesCached().getCount();
+ }
+
+ public CompositeData[] browse() throws OpenDataException {
+ Message[] messages = destination.browse();
+ CompositeData c[] = new CompositeData[messages.length];
+ for (int i = 0; i < c.length; i++) {
+ try {
+ System.out.println(messages[i].getMessageId());
+ c[i] = OpenTypeSupport.convert(messages[i]);
+ } catch (Throwable e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ return c;
+ }
+
+ public TabularData browseAsTable() throws OpenDataException {
+ OpenTypeFactory factory =
OpenTypeSupport.getFactory(ActiveMQMessage.class);
+
+ Message[] messages = destination.browse();
+ CompositeType ct = factory.getCompositeType();
+ TabularType tt = new TabularType("MessageList", "MessageList", ct, new
String[]{"JMSMessageID"});
+ TabularDataSupport rc = new TabularDataSupport(tt);
+ for (int i = 0; i < messages.length; i++) {
+ System.out.println(messages[i].getMessageId());
+ rc.put(new CompositeDataSupport(ct,
factory.getFields(messages[i])));
+ }
+
+ return rc;
+ }
+
+
+ public CompositeData getMessage(String messageId) throws OpenDataException
{
+ Message rc = destination.getMessage(messageId);
+ if( rc ==null )
+ return null;
+ return OpenTypeSupport.convert(rc);
+ }
+
+ public void removeMessage(String messageId) {
+ destination.removeMessage(messageId);
+ }
+
+ public void purge() {
+ destination.purge();
+ }
+
+}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=375654&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
Tue Feb 7 10:34:21 2006
@@ -0,0 +1,42 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+
+
+public interface QueueViewMBean {
+
+ public void gc();
+ public void resetStatistics();
+
+ public long getEnqueueCount();
+ public long getDequeueCount();
+ public long getConsumerCount();
+ public long getMessages();
+ public long getMessagesCached();
+
+
+ public CompositeData[] browse() throws OpenDataException;
+ public TabularData browseAsTable() throws OpenDataException;
+ public CompositeData getMessage(String messageId) throws OpenDataException;
+ public void removeMessage(String messageId);
+ public void purge();
+
+}
\ No newline at end of file
Copied:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
(from r375288,
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java)
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java&r1=375288&r2=375654&rev=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
Tue Feb 7 10:34:21 2006
@@ -16,13 +16,13 @@
*/
package org.apache.activemq.broker.jmx;
-import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Topic;
-public class DestinationView implements DestinationViewMBean {
+public class TopicView implements TopicViewMBean {
- private final Destination destination;
+ private final Topic destination;
- public DestinationView(Destination destination) {
+ public TopicView(Topic destination) {
this.destination = destination;
}
Copied:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
(from r375288,
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java)
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java&r1=375288&r2=375654&rev=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
Tue Feb 7 10:34:21 2006
@@ -17,7 +17,7 @@
package org.apache.activemq.broker.jmx;
-public interface DestinationViewMBean {
+public interface TopicViewMBean {
public void gc();
public void resetStatistics();
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Tue Feb 7 10:34:21 2006
@@ -170,6 +170,11 @@
Destination dest = (Destination) iter.next();
dest.removeSubscription(context, sub);
}
+
+ destroySubscription(sub);
+ }
+
+ protected void destroySubscription(Subscription sub) {
}
public void removeSubscription(ConnectionContext context,
RemoveSubscriptionInfo info) throws Throwable {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Feb 7 10:34:21 2006
@@ -16,7 +16,11 @@
*/
package org.apache.activemq.broker.region;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
@@ -41,10 +45,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -139,7 +140,7 @@
for (Iterator iter = messages.iterator(); iter.hasNext();) {
IndirectMessageReference node = (IndirectMessageReference)
iter.next();
- if (node.isDropped() ) {
+ if (node.isDropped()) {
continue;
}
@@ -148,15 +149,13 @@
if (sub.matches(node, msgContext)) {
sub.add(node);
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
log.warn("Could not load message: " + e, e);
}
}
}
- }
- finally {
+ } finally {
msgContext.clear();
dispatchValve.turnOn();
}
@@ -193,17 +192,18 @@
MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
try {
msgContext.setDestination(destination);
-
+
for (Iterator iter = messages.iterator();
iter.hasNext();) {
- IndirectMessageReference node =
(IndirectMessageReference) iter.next();
- if (node.isDropped() ) {
+ IndirectMessageReference node =
(IndirectMessageReference) iter.next();
+ if (node.isDropped()) {
continue;
}
-
+
String groupID = node.getGroupID();
-
+
// Re-deliver all messages that the sub locked
- if (node.getLockOwner() == sub ||
wasExclusiveOwner || (groupID != null && ownedGroups.contains(groupID))) {
+ if (node.getLockOwner() == sub || wasExclusiveOwner
+ || (groupID != null &&
ownedGroups.contains(groupID))) {
node.incrementRedeliveryCounter();
node.unlock();
msgContext.setMessageReference(node);
@@ -216,8 +216,7 @@
}
}
- }
- finally {
+ } finally {
dispatchValve.turnOn();
}
@@ -225,9 +224,9 @@
public void send(final ConnectionContext context, final Message message)
throws Throwable {
- if( context.isProducerFlowControl() )
+ if (context.isProducerFlowControl())
usageManager.waitForSpace();
-
+
message.setRegionDestination(this);
if (store != null && message.isPersistent())
@@ -242,8 +241,7 @@
dispatch(context, node, message);
}
});
- }
- else {
+ } else {
dispatch(context, node, message);
}
} finally {
@@ -274,7 +272,7 @@
for (Iterator iter = messages.iterator(); iter.hasNext();) {
// Remove dropped messages from the queue.
IndirectMessageReference node = (IndirectMessageReference)
iter.next();
- if (node.isDropped()) {
+ if (node.isDropped()) {
garbageSize--;
iter.remove();
continue;
@@ -283,7 +281,8 @@
}
}
- public void acknowledge(ConnectionContext context, Subscription sub, final
MessageAck ack, final MessageReference node) throws IOException {
+ public void acknowledge(ConnectionContext context, Subscription sub, final
MessageAck ack,
+ final MessageReference node) throws IOException {
if (store != null && node.isPersistent()) {
store.removeMessage(context, ack);
}
@@ -291,15 +290,16 @@
public Message loadMessage(MessageId messageId) throws IOException {
Message msg = store.getMessage(messageId);
- if( msg!=null ) {
+ if (msg != null) {
msg.setRegionDestination(this);
}
return msg;
}
public String toString() {
- return "Queue: destination=" + destination.getPhysicalName() + ",
subscriptions=" + consumers.size() + ", memory=" +
usageManager.getPercentUsage()
- + "%, size=" + messages.size() + ", in flight groups=" +
messageGroupOwners;
+ return "Queue: destination=" + destination.getPhysicalName() + ",
subscriptions=" + consumers.size()
+ + ", memory=" + usageManager.getPercentUsage() + "%, size=" +
messages.size() + ", in flight groups="
+ + messageGroupOwners;
}
public void start() throws Exception {
@@ -324,7 +324,7 @@
public MessageGroupMap getMessageGroupOwners() {
if (messageGroupOwners == null) {
- messageGroupOwners = new
MessageGroupHashBucket(messageGroupHashBucketCount );
+ messageGroupOwners = new
MessageGroupHashBucket(messageGroupHashBucketCount);
}
return messageGroupOwners;
}
@@ -352,7 +352,6 @@
public void setMessageGroupHashBucketCount(int
messageGroupHashBucketCount) {
this.messageGroupHashBucketCount = messageGroupHashBucketCount;
}
-
// Implementation methods
//
-------------------------------------------------------------------------
@@ -370,7 +369,7 @@
messages.add(node);
}
- synchronized(consumers) {
+ synchronized (consumers) {
if (consumers.isEmpty()) {
log.debug("No subscriptions registered, will not dispatch
message at this time.");
return;
@@ -381,8 +380,7 @@
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(context, node, msgContext, consumers);
- }
- finally {
+ } finally {
msgContext.clear();
dispatchValve.decrement();
}
@@ -404,5 +402,94 @@
public MessageStore getMessageStore() {
return store;
}
+
+ public Message[] browse() {
+
+ ArrayList l = new ArrayList();
+ synchronized (messages) {
+ for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ try {
+ MessageReference r = (MessageReference) iter.next();
+ try {
+ Message m = r.getMessage();
+ if (m != null) {
+ l.add(m);
+ }
+ } finally {
+ r.decrementReferenceCount();
+ }
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ return (Message[]) l.toArray(new Message[l.size()]);
+ }
+
+ public void removeMessage(String messageId) {
+ synchronized (messages) {
+ ConnectionContext c = new ConnectionContext();
+ for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ try {
+ IndirectMessageReference r = (IndirectMessageReference)
iter.next();
+ if (messageId.equals(r.getMessageId().toString())) {
+ MessageAck ack = new MessageAck();
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ ack.setDestination(destination);
+ ack.setMessageID(r.getMessageId());
+ acknowledge(c, null, ack, r);
+ r.drop();
+ dropEvent();
+ }
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+
+ public Message getMessage(String messageId) {
+ synchronized (messages) {
+ for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ try {
+ MessageReference r = (MessageReference) iter.next();
+ if (messageId.equals(r.getMessageId().toString())) {
+ r.incrementReferenceCount();
+ try {
+ Message m = r.getMessage();
+ if (m != null) {
+ return m;
+ }
+ } finally {
+ r.decrementReferenceCount();
+ }
+ break;
+ }
+ } catch (IOException e) {
+ }
+ }
+ }
+ return null;
+ }
+
+ public void purge() {
+ synchronized (messages) {
+ ConnectionContext c = new ConnectionContext();
+ for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ try {
+ IndirectMessageReference r = (IndirectMessageReference)
iter.next();
+ MessageAck ack = new MessageAck();
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ ack.setDestination(destination);
+ ack.setMessageID(r.getMessageId());
+ acknowledge(c, null, ack, r);
+ r.drop();
+ dropEvent();
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=375654&r1=375653&r2=375654&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
Tue Feb 7 10:34:21 2006
@@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.Map;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@@ -660,5 +661,10 @@
return super.toString() + " ActiveMQMapMessage{ " +
"theTable = " + map +
" }";
+ }
+
+ public Map getContentMap() throws JMSException {
+ initializeReading();
+ return map;
}
}